You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by mxmrlv <gi...@git.apache.org> on 2017/06/14 14:18:56 UTC

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

GitHub user mxmrlv opened a pull request:

    https://github.com/apache/incubator-ariatosca/pull/156

    ARIA-278 remove core tasks

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/apache/incubator-ariatosca ARIA-278-Remove-core-tasks

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-ariatosca/pull/156.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #156
    
----
commit f36fe86c17207a1349821ec599b349d94d5b2d6a
Author: max-orlov <ma...@gigaspaces.com>
Date:   2017-06-11T16:05:35Z

    wip

commit 62fa985c6bf1e888710dd109a35cc122d7841ba2
Author: max-orlov <ma...@gigaspaces.com>
Date:   2017-06-12T16:47:50Z

    wip2

commit 8044a035fdb7f88e871e3a643b47b3e42c43da67
Author: max-orlov <ma...@gigaspaces.com>
Date:   2017-06-13T12:56:44Z

    test fixes, moved close executor to be handled inside the executor

commit 979a4b445bf0261db2bb2632cd93e88ea6d61222
Author: max-orlov <ma...@gigaspaces.com>
Date:   2017-06-13T13:49:15Z

    wip on executor tests

commit 092b45f0de707cc9c1ed8bebe594bb0b7bb5846f
Author: max-orlov <ma...@gigaspaces.com>
Date:   2017-06-13T17:33:34Z

    graph work

commit 58e212c7dbdea7aa9b532bba9fea4a5359a49019
Author: max-orlov <ma...@gigaspaces.com>
Date:   2017-06-14T14:17:16Z

    tasks no longer execute their executor

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/156#discussion_r122958272
  
    --- Diff: aria/modeling/orchestration.py ---
    @@ -392,8 +397,66 @@ def abort(message=None):
         def retry(message=None, retry_interval=None):
             raise TaskRetryException(message, retry_interval=retry_interval)
     
    +    @declared_attr
    +    def dependency_fk(self):
    +        return relationship.foreign_key('task', nullable=True)
    +
    +    @declared_attr
    +    def dependencies(cls):
    +        # symmetric relationship causes funky graphs
    +        return relationship.one_to_many_self(cls, 'dependency_fk')
    +
    +    def has_ended(self):
    +        return self.status in (self.SUCCESS, self.FAILED)
    +
    +    def is_waiting(self):
    +        if self.stub_type:
    +            return not self.has_ended()
    +        else:
    +            return self.status in (self.PENDING, self.RETRYING)
    +
    +    @classmethod
    +    def from_api_task(cls, api_task, executor, **kwargs):
    +        from aria.orchestrator import context
    +        instantiation_kwargs = {}
    +
    +        if hasattr(api_task.actor, 'outbound_relationships'):
    --- End diff --
    
    it does not


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/156#discussion_r122955823
  
    --- Diff: aria/orchestrator/workflows/core/events_handler.py ---
    @@ -30,120 +30,122 @@
     
     
     @events.sent_task_signal.connect
    -def _task_sent(task, *args, **kwargs):
    -    with task._update():
    -        task.status = task.SENT
    +def _task_sent(ctx, *args, **kwargs):
    +    with ctx.track_changes:
    +        ctx.task.status = ctx.task.SENT
     
     
     @events.start_task_signal.connect
    -def _task_started(task, *args, **kwargs):
    -    with task._update():
    -        task.started_at = datetime.utcnow()
    -        task.status = task.STARTED
    -    _update_node_state_if_necessary(task, is_transitional=True)
    +def _task_started(ctx, *args, **kwargs):
    +    with ctx.track_changes:
    +        ctx.task.started_at = datetime.utcnow()
    +        ctx.task.status = ctx.task.STARTED
    +        _update_node_state_if_necessary(ctx, is_transitional=True)
     
     
     @events.on_failure_task_signal.connect
    -def _task_failed(task, exception, *args, **kwargs):
    -    with task._update():
    +def _task_failed(ctx, exception, *args, **kwargs):
    +    with ctx.track_changes:
             should_retry = all([
                 not isinstance(exception, exceptions.TaskAbortException),
    -            task.attempts_count < task.max_attempts or task.max_attempts == task.INFINITE_RETRIES,
    -            # ignore_failure check here means the task will not be retries and it will be marked
    +            ctx.task.attempts_count < ctx.task.max_attempts or
    +            ctx.task.max_attempts == ctx.task.INFINITE_RETRIES,
    +            # ignore_failure check here means the task will not be retried and it will be marked
                 # as failed. The engine will also look at ignore_failure so it won't fail the
                 # workflow.
    -            not task.ignore_failure
    +            not ctx.task.ignore_failure
             ])
             if should_retry:
                 retry_interval = None
                 if isinstance(exception, exceptions.TaskRetryException):
                     retry_interval = exception.retry_interval
                 if retry_interval is None:
    -                retry_interval = task.retry_interval
    -            task.status = task.RETRYING
    -            task.attempts_count += 1
    -            task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval)
    +                retry_interval = ctx.task.retry_interval
    +            ctx.task.status = ctx.task.RETRYING
    +            ctx.task.attempts_count += 1
    +            ctx.task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval)
             else:
    -            task.ended_at = datetime.utcnow()
    -            task.status = task.FAILED
    +            ctx.task.ended_at = datetime.utcnow()
    +            ctx.task.status = ctx.task.FAILED
     
     
     @events.on_success_task_signal.connect
    -def _task_succeeded(task, *args, **kwargs):
    -    with task._update():
    -        task.ended_at = datetime.utcnow()
    -        task.status = task.SUCCESS
    +def _task_succeeded(ctx, *args, **kwargs):
    +    with ctx.track_changes:
    +        ctx.task.ended_at = datetime.utcnow()
    +        ctx.task.status = ctx.task.SUCCESS
     
    -    _update_node_state_if_necessary(task)
    +        _update_node_state_if_necessary(ctx)
     
     
     @events.start_workflow_signal.connect
     def _workflow_started(workflow_context, *args, **kwargs):
    -    execution = workflow_context.execution
    -    # the execution may already be in the process of cancelling
    -    if execution.status in (execution.CANCELLING, execution.CANCELLED):
    -        return
    -    execution.status = execution.STARTED
    -    execution.started_at = datetime.utcnow()
    -    workflow_context.execution = execution
    +    with workflow_context.track_changes:
    +        execution = workflow_context.execution
    +        # the execution may already be in the process of cancelling
    +        if execution.status in (execution.CANCELLING, execution.CANCELLED):
    +            return
    +        execution.status = execution.STARTED
    +        execution.started_at = datetime.utcnow()
     
     
     @events.on_failure_workflow_signal.connect
     def _workflow_failed(workflow_context, exception, *args, **kwargs):
    -    execution = workflow_context.execution
    -    execution.error = str(exception)
    -    execution.status = execution.FAILED
    -    execution.ended_at = datetime.utcnow()
    -    workflow_context.execution = execution
    +    with workflow_context.track_changes:
    +        execution = workflow_context.execution
    +        execution.error = str(exception)
    +        execution.status = execution.FAILED
    +        execution.ended_at = datetime.utcnow()
     
     
     @events.on_success_workflow_signal.connect
     def _workflow_succeeded(workflow_context, *args, **kwargs):
    -    execution = workflow_context.execution
    -    execution.status = execution.SUCCEEDED
    -    execution.ended_at = datetime.utcnow()
    -    workflow_context.execution = execution
    +    with workflow_context.track_changes:
    +        execution = workflow_context.execution
    +        execution.status = execution.SUCCEEDED
    +        execution.ended_at = datetime.utcnow()
     
     
     @events.on_cancelled_workflow_signal.connect
     def _workflow_cancelled(workflow_context, *args, **kwargs):
    -    execution = workflow_context.execution
    -    # _workflow_cancelling function may have called this function already
    -    if execution.status == execution.CANCELLED:
    -        return
    -    # the execution may have already been finished
    -    elif execution.status in (execution.SUCCEEDED, execution.FAILED):
    -        _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
    -    else:
    -        execution.status = execution.CANCELLED
    -        execution.ended_at = datetime.utcnow()
    -        workflow_context.execution = execution
    +    with workflow_context.track_changes:
    +        execution = workflow_context.execution
    +        # _workflow_cancelling function may have called this function already
    +        if execution.status == execution.CANCELLED:
    +            return
    +        # the execution may have already been finished
    +        elif execution.status in (execution.SUCCEEDED, execution.FAILED):
    +            _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
    +        else:
    +            execution.status = execution.CANCELLED
    +            execution.ended_at = datetime.utcnow()
     
     
     @events.on_cancelling_workflow_signal.connect
     def _workflow_cancelling(workflow_context, *args, **kwargs):
    -    execution = workflow_context.execution
    -    if execution.status == execution.PENDING:
    -        return _workflow_cancelled(workflow_context=workflow_context)
    -    # the execution may have already been finished
    -    elif execution.status in (execution.SUCCEEDED, execution.FAILED):
    -        _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
    -    else:
    -        execution.status = execution.CANCELLING
    -        workflow_context.execution = execution
    -
    -
    -def _update_node_state_if_necessary(task, is_transitional=False):
    +    with workflow_context.track_changes:
    +        execution = workflow_context.execution
    +        if execution.status == execution.PENDING:
    +            return _workflow_cancelled(workflow_context=workflow_context)
    +        # the execution may have already been finished
    +        elif execution.status in (execution.SUCCEEDED, execution.FAILED):
    +            _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
    +        else:
    +            execution.status = execution.CANCELLING
    +            workflow_context.execution = execution
    --- End diff --
    
    remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/156#discussion_r123010002
  
    --- Diff: aria/orchestrator/workflows/core/task.py ---
    @@ -0,0 +1,119 @@
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    --- End diff --
    
    compile


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/156#discussion_r122952290
  
    --- Diff: aria/orchestrator/workflow_runner.py ---
    @@ -80,15 +80,19 @@ def __init__(self, workflow_name, service_id, inputs,
                 task_max_attempts=task_max_attempts,
                 task_retry_interval=task_retry_interval)
     
    +        # Set default executor and kwargs
    +        executor = executor or ProcessExecutor(plugin_manager=plugin_manager)
    +
             # transforming the execution inputs to dict, to pass them to the workflow function
             execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values())
    -        self._tasks_graph = workflow_fn(ctx=workflow_context, **execution_inputs_dict)
     
    -        executor = executor or ProcessExecutor(plugin_manager=plugin_manager)
    -        self._engine = Engine(
    -            executor=executor,
    -            workflow_context=workflow_context,
    -            tasks_graph=self._tasks_graph)
    +        self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
    +        engine.construct_execution_tasks(self.execution, self._tasks_graph, executor.__class__)
    +
    +        # Update the state
    +        self._model_storage.execution.update(execution)
    --- End diff --
    
    remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/156#discussion_r122946023
  
    --- Diff: aria/modeling/orchestration.py ---
    @@ -296,30 +326,17 @@ def plugin(cls):
             return relationship.many_to_one(cls, 'plugin')
     
         @declared_attr
    -    def execution(cls):
    -        return relationship.many_to_one(cls, 'execution')
    -
    -    @declared_attr
         def arguments(cls):
             return relationship.one_to_many(cls, 'argument', dict_key='name')
     
         function = Column(String)
         max_attempts = Column(Integer, default=1)
         retry_interval = Column(Float, default=0)
         ignore_failure = Column(Boolean, default=False)
    +    interface_name = Column(String)
    +    operation_name = Column(String)
     
    -    # State
    -    status = Column(Enum(*STATES, name='status'), default=PENDING)
    -    due_at = Column(DateTime, nullable=False, index=True, default=datetime.utcnow())
    -    started_at = Column(DateTime, default=None)
    -    ended_at = Column(DateTime, default=None)
    -    attempts_count = Column(Integer, default=1)
    -
    -    def has_ended(self):
    -        return self.status in (self.SUCCESS, self.FAILED)
    -
    -    def is_waiting(self):
    -        return self.status in (self.PENDING, self.RETRYING)
    +    stub_type = Column(Enum(*STUB_TYPES))
    --- End diff --
    
    _stub_type


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/156#discussion_r122951095
  
    --- Diff: aria/orchestrator/workflows/core/engine.py ---
    @@ -38,84 +35,192 @@ class Engine(logger.LoggerMixin):
         The workflow engine. Executes workflows
         """
     
    -    def __init__(self, executor, workflow_context, tasks_graph, **kwargs):
    +    def __init__(self, default_executor, **kwargs):
             super(Engine, self).__init__(**kwargs)
    -        self._workflow_context = workflow_context
    -        self._execution_graph = networkx.DiGraph()
    -        translation.build_execution_graph(task_graph=tasks_graph,
    -                                          execution_graph=self._execution_graph,
    -                                          default_executor=executor)
    +        self._executors = {default_executor.__class__: default_executor}
    +        self._executing_tasks = []
     
    -    def execute(self):
    +    def execute(self, ctx):
             """
             execute the workflow
             """
             try:
    -            events.start_workflow_signal.send(self._workflow_context)
    +            events.start_workflow_signal.send(ctx)
                 while True:
    -                cancel = self._is_cancel()
    +                cancel = self._is_cancel(ctx)
                     if cancel:
                         break
    -                for task in self._ended_tasks():
    -                    self._handle_ended_tasks(task)
    -                for task in self._executable_tasks():
    -                    self._handle_executable_task(task)
    -                if self._all_tasks_consumed():
    +                for task in self._ended_tasks(ctx):
    +                    self._handle_ended_tasks(ctx, task)
    +                for task in self._executable_tasks(ctx):
    +                    self._handle_executable_task(ctx, task)
    +                if self._all_tasks_consumed(ctx):
                         break
                     else:
                         time.sleep(0.1)
                 if cancel:
    -                events.on_cancelled_workflow_signal.send(self._workflow_context)
    +                events.on_cancelled_workflow_signal.send(ctx)
                 else:
    -                events.on_success_workflow_signal.send(self._workflow_context)
    +                events.on_success_workflow_signal.send(ctx)
             except BaseException as e:
    -            events.on_failure_workflow_signal.send(self._workflow_context, exception=e)
    +            events.on_failure_workflow_signal.send(ctx, exception=e)
                 raise
     
    -    def cancel_execution(self):
    +    @staticmethod
    +    def cancel_execution(ctx):
             """
             Send a cancel request to the engine. If execution already started, execution status
             will be modified to 'cancelling' status. If execution is in pending mode, execution status
             will be modified to 'cancelled' directly.
             """
    -        events.on_cancelling_workflow_signal.send(self._workflow_context)
    +        events.on_cancelling_workflow_signal.send(ctx)
     
    -    def _is_cancel(self):
    -        return self._workflow_context.execution.status in (models.Execution.CANCELLING,
    -                                                           models.Execution.CANCELLED)
    +    @staticmethod
    +    def _is_cancel(ctx):
    +        execution = ctx.model.execution.update(ctx.execution)
    +        return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED)
     
    -    def _executable_tasks(self):
    +    def _executable_tasks(self, ctx):
             now = datetime.utcnow()
    -        return (task for task in self._tasks_iter()
    -                if task.is_waiting() and
    -                task.due_at <= now and
    -                not self._task_has_dependencies(task))
    +        return (
    +            task for task in self._tasks_iter(ctx)
    +            if task.is_waiting() and task.due_at <= now and \
    +            not self._task_has_dependencies(ctx, task)
    +        )
     
    -    def _ended_tasks(self):
    -        return (task for task in self._tasks_iter() if task.has_ended())
    +    def _ended_tasks(self, ctx):
    +        for task in self._executing_tasks:
    +            if task.has_ended() and task in ctx._graph:
    +                yield task
     
    -    def _task_has_dependencies(self, task):
    -        return len(self._execution_graph.pred.get(task.id, {})) > 0
    -
    -    def _all_tasks_consumed(self):
    -        return len(self._execution_graph.node) == 0
    -
    -    def _tasks_iter(self):
    -        for _, data in self._execution_graph.nodes_iter(data=True):
    -            task = data['task']
    -            if isinstance(task, engine_task.OperationTask):
    -                if not task.model_task.has_ended():
    -                    self._workflow_context.model.task.refresh(task.model_task)
    -            yield task
    +    @staticmethod
    +    def _task_has_dependencies(ctx, task):
    +        return len(ctx._graph.pred.get(task, [])) > 0
     
         @staticmethod
    -    def _handle_executable_task(task):
    -        if isinstance(task, engine_task.OperationTask):
    -            events.sent_task_signal.send(task)
    -        task.execute()
    +    def _all_tasks_consumed(ctx):
    +        return len(ctx._graph.node) == 0
     
    -    def _handle_ended_tasks(self, task):
    +    @staticmethod
    +    def _tasks_iter(ctx):
    +        for task in ctx.execution.tasks:
    +            yield ctx.model.task.refresh(task)
    +
    +    def _handle_executable_task(self, ctx, task):
    +        if task._executor not in self._executors:
    --- End diff --
    
    do not instantiate


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/156#discussion_r122951228
  
    --- Diff: aria/orchestrator/workflows/core/engine.py ---
    @@ -38,84 +35,192 @@ class Engine(logger.LoggerMixin):
         The workflow engine. Executes workflows
         """
     
    -    def __init__(self, executor, workflow_context, tasks_graph, **kwargs):
    +    def __init__(self, default_executor, **kwargs):
             super(Engine, self).__init__(**kwargs)
    -        self._workflow_context = workflow_context
    -        self._execution_graph = networkx.DiGraph()
    -        translation.build_execution_graph(task_graph=tasks_graph,
    -                                          execution_graph=self._execution_graph,
    -                                          default_executor=executor)
    +        self._executors = {default_executor.__class__: default_executor}
    +        self._executing_tasks = []
     
    -    def execute(self):
    +    def execute(self, ctx):
             """
             execute the workflow
             """
             try:
    -            events.start_workflow_signal.send(self._workflow_context)
    +            events.start_workflow_signal.send(ctx)
                 while True:
    -                cancel = self._is_cancel()
    +                cancel = self._is_cancel(ctx)
                     if cancel:
                         break
    -                for task in self._ended_tasks():
    -                    self._handle_ended_tasks(task)
    -                for task in self._executable_tasks():
    -                    self._handle_executable_task(task)
    -                if self._all_tasks_consumed():
    +                for task in self._ended_tasks(ctx):
    +                    self._handle_ended_tasks(ctx, task)
    +                for task in self._executable_tasks(ctx):
    +                    self._handle_executable_task(ctx, task)
    +                if self._all_tasks_consumed(ctx):
                         break
                     else:
                         time.sleep(0.1)
                 if cancel:
    -                events.on_cancelled_workflow_signal.send(self._workflow_context)
    +                events.on_cancelled_workflow_signal.send(ctx)
                 else:
    -                events.on_success_workflow_signal.send(self._workflow_context)
    +                events.on_success_workflow_signal.send(ctx)
             except BaseException as e:
    -            events.on_failure_workflow_signal.send(self._workflow_context, exception=e)
    +            events.on_failure_workflow_signal.send(ctx, exception=e)
                 raise
     
    -    def cancel_execution(self):
    +    @staticmethod
    +    def cancel_execution(ctx):
             """
             Send a cancel request to the engine. If execution already started, execution status
             will be modified to 'cancelling' status. If execution is in pending mode, execution status
             will be modified to 'cancelled' directly.
             """
    -        events.on_cancelling_workflow_signal.send(self._workflow_context)
    +        events.on_cancelling_workflow_signal.send(ctx)
     
    -    def _is_cancel(self):
    -        return self._workflow_context.execution.status in (models.Execution.CANCELLING,
    -                                                           models.Execution.CANCELLED)
    +    @staticmethod
    +    def _is_cancel(ctx):
    +        execution = ctx.model.execution.update(ctx.execution)
    +        return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED)
     
    -    def _executable_tasks(self):
    +    def _executable_tasks(self, ctx):
             now = datetime.utcnow()
    -        return (task for task in self._tasks_iter()
    -                if task.is_waiting() and
    -                task.due_at <= now and
    -                not self._task_has_dependencies(task))
    +        return (
    +            task for task in self._tasks_iter(ctx)
    +            if task.is_waiting() and task.due_at <= now and \
    +            not self._task_has_dependencies(ctx, task)
    +        )
     
    -    def _ended_tasks(self):
    -        return (task for task in self._tasks_iter() if task.has_ended())
    +    def _ended_tasks(self, ctx):
    +        for task in self._executing_tasks:
    +            if task.has_ended() and task in ctx._graph:
    +                yield task
     
    -    def _task_has_dependencies(self, task):
    -        return len(self._execution_graph.pred.get(task.id, {})) > 0
    -
    -    def _all_tasks_consumed(self):
    -        return len(self._execution_graph.node) == 0
    -
    -    def _tasks_iter(self):
    -        for _, data in self._execution_graph.nodes_iter(data=True):
    -            task = data['task']
    -            if isinstance(task, engine_task.OperationTask):
    -                if not task.model_task.has_ended():
    -                    self._workflow_context.model.task.refresh(task.model_task)
    -            yield task
    +    @staticmethod
    +    def _task_has_dependencies(ctx, task):
    +        return len(ctx._graph.pred.get(task, [])) > 0
     
         @staticmethod
    -    def _handle_executable_task(task):
    -        if isinstance(task, engine_task.OperationTask):
    -            events.sent_task_signal.send(task)
    -        task.execute()
    +    def _all_tasks_consumed(ctx):
    +        return len(ctx._graph.node) == 0
     
    -    def _handle_ended_tasks(self, task):
    +    @staticmethod
    +    def _tasks_iter(ctx):
    +        for task in ctx.execution.tasks:
    +            yield ctx.model.task.refresh(task)
    +
    +    def _handle_executable_task(self, ctx, task):
    +        if task._executor not in self._executors:
    +            self._executors[task._executor] = task._executor()
    +        task_executor = self._executors[task._executor]
    +
    +        context_cls = task._context_cls or operation.BaseOperationContext
    --- End diff --
    
    task.stub_type


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/156#discussion_r122948878
  
    --- Diff: aria/orchestrator/workflow_runner.py ---
    @@ -80,15 +80,19 @@ def __init__(self, workflow_name, service_id, inputs,
                 task_max_attempts=task_max_attempts,
                 task_retry_interval=task_retry_interval)
     
    +        # Set default executor and kwargs
    +        executor = executor or ProcessExecutor(plugin_manager=plugin_manager)
    +
             # transforming the execution inputs to dict, to pass them to the workflow function
             execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values())
    -        self._tasks_graph = workflow_fn(ctx=workflow_context, **execution_inputs_dict)
     
    -        executor = executor or ProcessExecutor(plugin_manager=plugin_manager)
    -        self._engine = Engine(
    -            executor=executor,
    -            workflow_context=workflow_context,
    -            tasks_graph=self._tasks_graph)
    +        self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
    +        engine.construct_execution_tasks(self.execution, self._tasks_graph, executor.__class__)
    --- End diff --
    
    new method 90 + 93


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/156#discussion_r122952484
  
    --- Diff: aria/orchestrator/workflows/core/engine.py ---
    @@ -38,84 +35,192 @@ class Engine(logger.LoggerMixin):
         The workflow engine. Executes workflows
         """
     
    -    def __init__(self, executor, workflow_context, tasks_graph, **kwargs):
    +    def __init__(self, default_executor, **kwargs):
             super(Engine, self).__init__(**kwargs)
    -        self._workflow_context = workflow_context
    -        self._execution_graph = networkx.DiGraph()
    -        translation.build_execution_graph(task_graph=tasks_graph,
    -                                          execution_graph=self._execution_graph,
    -                                          default_executor=executor)
    +        self._executors = {default_executor.__class__: default_executor}
    +        self._executing_tasks = []
     
    -    def execute(self):
    +    def execute(self, ctx):
             """
             execute the workflow
             """
             try:
    -            events.start_workflow_signal.send(self._workflow_context)
    +            events.start_workflow_signal.send(ctx)
                 while True:
    -                cancel = self._is_cancel()
    +                cancel = self._is_cancel(ctx)
                     if cancel:
                         break
    -                for task in self._ended_tasks():
    -                    self._handle_ended_tasks(task)
    -                for task in self._executable_tasks():
    -                    self._handle_executable_task(task)
    -                if self._all_tasks_consumed():
    +                for task in self._ended_tasks(ctx):
    +                    self._handle_ended_tasks(ctx, task)
    +                for task in self._executable_tasks(ctx):
    +                    self._handle_executable_task(ctx, task)
    +                if self._all_tasks_consumed(ctx):
                         break
                     else:
                         time.sleep(0.1)
                 if cancel:
    -                events.on_cancelled_workflow_signal.send(self._workflow_context)
    +                events.on_cancelled_workflow_signal.send(ctx)
                 else:
    -                events.on_success_workflow_signal.send(self._workflow_context)
    +                events.on_success_workflow_signal.send(ctx)
             except BaseException as e:
    -            events.on_failure_workflow_signal.send(self._workflow_context, exception=e)
    +            events.on_failure_workflow_signal.send(ctx, exception=e)
                 raise
     
    -    def cancel_execution(self):
    +    @staticmethod
    +    def cancel_execution(ctx):
             """
             Send a cancel request to the engine. If execution already started, execution status
             will be modified to 'cancelling' status. If execution is in pending mode, execution status
             will be modified to 'cancelled' directly.
             """
    -        events.on_cancelling_workflow_signal.send(self._workflow_context)
    +        events.on_cancelling_workflow_signal.send(ctx)
     
    -    def _is_cancel(self):
    -        return self._workflow_context.execution.status in (models.Execution.CANCELLING,
    -                                                           models.Execution.CANCELLED)
    +    @staticmethod
    +    def _is_cancel(ctx):
    +        execution = ctx.model.execution.update(ctx.execution)
    +        return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED)
     
    -    def _executable_tasks(self):
    +    def _executable_tasks(self, ctx):
             now = datetime.utcnow()
    -        return (task for task in self._tasks_iter()
    -                if task.is_waiting() and
    -                task.due_at <= now and
    -                not self._task_has_dependencies(task))
    +        return (
    +            task for task in self._tasks_iter(ctx)
    +            if task.is_waiting() and task.due_at <= now and \
    +            not self._task_has_dependencies(ctx, task)
    +        )
     
    -    def _ended_tasks(self):
    -        return (task for task in self._tasks_iter() if task.has_ended())
    +    def _ended_tasks(self, ctx):
    +        for task in self._executing_tasks:
    +            if task.has_ended() and task in ctx._graph:
    +                yield task
     
    -    def _task_has_dependencies(self, task):
    -        return len(self._execution_graph.pred.get(task.id, {})) > 0
    -
    -    def _all_tasks_consumed(self):
    -        return len(self._execution_graph.node) == 0
    -
    -    def _tasks_iter(self):
    -        for _, data in self._execution_graph.nodes_iter(data=True):
    -            task = data['task']
    -            if isinstance(task, engine_task.OperationTask):
    -                if not task.model_task.has_ended():
    -                    self._workflow_context.model.task.refresh(task.model_task)
    -            yield task
    +    @staticmethod
    +    def _task_has_dependencies(ctx, task):
    +        return len(ctx._graph.pred.get(task, [])) > 0
     
         @staticmethod
    -    def _handle_executable_task(task):
    -        if isinstance(task, engine_task.OperationTask):
    -            events.sent_task_signal.send(task)
    -        task.execute()
    +    def _all_tasks_consumed(ctx):
    +        return len(ctx._graph.node) == 0
     
    -    def _handle_ended_tasks(self, task):
    +    @staticmethod
    +    def _tasks_iter(ctx):
    +        for task in ctx.execution.tasks:
    +            yield ctx.model.task.refresh(task)
    +
    +    def _handle_executable_task(self, ctx, task):
    +        if task._executor not in self._executors:
    +            self._executors[task._executor] = task._executor()
    +        task_executor = self._executors[task._executor]
    +
    +        context_cls = task._context_cls or operation.BaseOperationContext
    +        op_ctx = context_cls(
    +            model_storage=ctx.model,
    +            resource_storage=ctx.resource,
    +            workdir=ctx._workdir,
    +            task_id=task.id,
    +            actor_id=task.actor.id if task.actor else None,
    +            service_id=task.execution.service.id,
    +            execution_id=task.execution.id,
    +            name=task.name
    +        )
    +
    +        self._executing_tasks.append(task)
    +
    +        if not task.stub_type:
    +            events.sent_task_signal.send(op_ctx)
    +        task_executor.execute(op_ctx)
    +
    +    def _handle_ended_tasks(self, ctx, task):
    +        self._executing_tasks.remove(task)
             if task.status == models.Task.FAILED and not task.ignore_failure:
                 raise exceptions.ExecutorException('Workflow failed')
             else:
    -            self._execution_graph.remove_node(task.id)
    +            ctx._graph.remove_node(task)
    +
    +
    +def construct_execution_tasks(execution,
    +                              task_graph,
    +                              default_executor,
    +                              stub_executor=executor.base.StubTaskExecutor,
    +                              start_stub_type=models.Task.START_WORKFLOW,
    +                              end_stub_type=models.Task.END_WORKFLOW,
    +                              depends_on=()):
    +    """
    +    Translates the user graph to the execution graph
    +    :param task_graph: The user's graph
    +    :param start_stub_type: internal use
    +    :param end_stub_type: internal use
    +    :param depends_on: internal use
    +    """
    +    depends_on = list(depends_on)
    +
    +    # Insert start marker
    +    start_task = models.Task(api_id=_start_graph_suffix(task_graph.id),
    +                             _executor=stub_executor,
    +                             execution=execution,
    +                             stub_type=start_stub_type,
    +                             dependencies=depends_on)
    +
    +    for task in task_graph.topological_order(reverse=True):
    +        operation_dependencies = _get_tasks_from_dependencies(
    +            execution, task_graph.get_dependencies(task), [start_task])
    +
    +        if isinstance(task, api.task.OperationTask):
    +            models.Task.from_api_task(api_task=task,
    +                                      executor=default_executor,
    +                                      dependencies=operation_dependencies)
    +
    +        elif isinstance(task, api.task.WorkflowTask):
    +            # Build the graph recursively while adding start and end markers
    +            construct_execution_tasks(
    +                execution=execution,
    +                task_graph=task,
    +                default_executor=default_executor,
    +                stub_executor=stub_executor,
    +                start_stub_type=models.Task.START_SUBWROFKLOW,
    +                end_stub_type=models.Task.END_SUBWORKFLOW,
    +                depends_on=operation_dependencies
    +            )
    +        elif isinstance(task, api.task.StubTask):
    +            models.Task(api_id=task.id,
    +                        _executor=stub_executor,
    +                        execution=execution,
    +                        stub_type=models.Task.STUB,
    +                        dependencies=operation_dependencies)
    +        else:
    +            raise RuntimeError('Undefined state')
    +
    +    # Insert end marker
    +    models.Task(api_id=_end_graph_suffix(task_graph.id),
    +                _executor=stub_executor,
    +                execution=execution,
    +                stub_type=end_stub_type,
    +                dependencies=_get_non_dependent_tasks(execution) or [start_task])
    +
    +
    +def _start_graph_suffix(api_id):
    +    return '{0}-Start'.format(api_id)
    +
    +
    +def _end_graph_suffix(api_id):
    +    return '{0}-End'.format(api_id)
    +
    +
    +def _get_non_dependent_tasks(execution):
    +    dependency_tasks = set()
    --- End diff --
    
    tasks_with_dependencies


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/156#discussion_r122950583
  
    --- Diff: aria/orchestrator/workflows/core/engine.py ---
    @@ -38,84 +35,192 @@ class Engine(logger.LoggerMixin):
         The workflow engine. Executes workflows
         """
     
    -    def __init__(self, executor, workflow_context, tasks_graph, **kwargs):
    +    def __init__(self, default_executor, **kwargs):
             super(Engine, self).__init__(**kwargs)
    -        self._workflow_context = workflow_context
    -        self._execution_graph = networkx.DiGraph()
    -        translation.build_execution_graph(task_graph=tasks_graph,
    -                                          execution_graph=self._execution_graph,
    -                                          default_executor=executor)
    +        self._executors = {default_executor.__class__: default_executor}
    +        self._executing_tasks = []
     
    -    def execute(self):
    +    def execute(self, ctx):
             """
             execute the workflow
             """
             try:
    -            events.start_workflow_signal.send(self._workflow_context)
    +            events.start_workflow_signal.send(ctx)
                 while True:
    -                cancel = self._is_cancel()
    +                cancel = self._is_cancel(ctx)
                     if cancel:
                         break
    -                for task in self._ended_tasks():
    -                    self._handle_ended_tasks(task)
    -                for task in self._executable_tasks():
    -                    self._handle_executable_task(task)
    -                if self._all_tasks_consumed():
    +                for task in self._ended_tasks(ctx):
    +                    self._handle_ended_tasks(ctx, task)
    +                for task in self._executable_tasks(ctx):
    +                    self._handle_executable_task(ctx, task)
    +                if self._all_tasks_consumed(ctx):
                         break
                     else:
                         time.sleep(0.1)
                 if cancel:
    -                events.on_cancelled_workflow_signal.send(self._workflow_context)
    +                events.on_cancelled_workflow_signal.send(ctx)
                 else:
    -                events.on_success_workflow_signal.send(self._workflow_context)
    +                events.on_success_workflow_signal.send(ctx)
             except BaseException as e:
    -            events.on_failure_workflow_signal.send(self._workflow_context, exception=e)
    +            events.on_failure_workflow_signal.send(ctx, exception=e)
                 raise
     
    -    def cancel_execution(self):
    +    @staticmethod
    +    def cancel_execution(ctx):
             """
             Send a cancel request to the engine. If execution already started, execution status
             will be modified to 'cancelling' status. If execution is in pending mode, execution status
             will be modified to 'cancelled' directly.
             """
    -        events.on_cancelling_workflow_signal.send(self._workflow_context)
    +        events.on_cancelling_workflow_signal.send(ctx)
     
    -    def _is_cancel(self):
    -        return self._workflow_context.execution.status in (models.Execution.CANCELLING,
    -                                                           models.Execution.CANCELLED)
    +    @staticmethod
    +    def _is_cancel(ctx):
    +        execution = ctx.model.execution.update(ctx.execution)
    --- End diff --
    
    refresh


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/156#discussion_r122956604
  
    --- Diff: tests/orchestrator/workflows/executor/__init__.py ---
    @@ -12,69 +12,80 @@
     # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     # See the License for the specific language governing permissions and
     # limitations under the License.
    -import uuid
     import logging
    -from collections import namedtuple
    +import uuid
     from contextlib import contextmanager
     
     import aria
     from aria.modeling import models
     
     
    +class MockContext(object):
    +
    +    def __init__(self, storage, task_kwargs=None):
    +        self.logger = logging.getLogger('mock_logger')
    +        self._task_kwargs = task_kwargs or {}
    +        self._storage = storage
    +        self.task = MockTask(storage, **task_kwargs)
    +        self.states = []
    +        self.exception = None
    +
    +    @property
    +    def serialization_dict(self):
    +        return {
    +            'context_cls': self.__class__,
    +            'context': {
    +                'storage_kwargs': self._storage.serialization_dict,
    +                'task_kwargs': self._task_kwargs
    +            }
    +        }
    +
    +    def __getattr__(self, item):
    +        return None
    +
    +    def close(self):
    +        pass
    +
    +    @classmethod
    +    def instantiate_from_dict(cls, storage_kwargs=None, task_kwargs=None):
    +        return cls(storage=aria.application_model_storage(**(storage_kwargs or {})),
    +                   task_kwargs=(task_kwargs or {}))
    +
    +    @property
    +    @contextmanager
    +    def track_changes(self):
    --- End diff --
    
    persist_changes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/156#discussion_r122945481
  
    --- Diff: aria/modeling/orchestration.py ---
    @@ -276,10 +289,27 @@ class TaskBase(ModelMixin):
             SUCCESS,
             FAILED,
         )
    -
         INFINITE_RETRIES = -1
     
         @declared_attr
    +    def execution(cls):
    +        return relationship.many_to_one(cls, 'execution')
    +
    +    @declared_attr
    +    def execution_fk(cls):
    +        return relationship.foreign_key('execution', nullable=True)
    +
    +    status = Column(Enum(*STATES, name='status'), default=PENDING)
    +    due_at = Column(DateTime, nullable=False, index=True, default=datetime.utcnow())
    +    started_at = Column(DateTime, default=None)
    +    ended_at = Column(DateTime, default=None)
    +    attempts_count = Column(Integer, default=1)
    +    api_id = Column(String)
    --- End diff --
    
    _api_id


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/156#discussion_r122950023
  
    --- Diff: aria/orchestrator/workflows/core/engine.py ---
    @@ -38,84 +35,192 @@ class Engine(logger.LoggerMixin):
         The workflow engine. Executes workflows
         """
     
    -    def __init__(self, executor, workflow_context, tasks_graph, **kwargs):
    +    def __init__(self, default_executor, **kwargs):
             super(Engine, self).__init__(**kwargs)
    -        self._workflow_context = workflow_context
    -        self._execution_graph = networkx.DiGraph()
    -        translation.build_execution_graph(task_graph=tasks_graph,
    -                                          execution_graph=self._execution_graph,
    -                                          default_executor=executor)
    +        self._executors = {default_executor.__class__: default_executor}
    +        self._executing_tasks = []
    --- End diff --
    
    move to execute method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/156#discussion_r122947448
  
    --- Diff: aria/modeling/orchestration.py ---
    @@ -392,8 +397,66 @@ def abort(message=None):
         def retry(message=None, retry_interval=None):
             raise TaskRetryException(message, retry_interval=retry_interval)
     
    +    @declared_attr
    +    def dependency_fk(self):
    +        return relationship.foreign_key('task', nullable=True)
    +
    +    @declared_attr
    +    def dependencies(cls):
    +        # symmetric relationship causes funky graphs
    +        return relationship.one_to_many_self(cls, 'dependency_fk')
    +
    +    def has_ended(self):
    +        return self.status in (self.SUCCESS, self.FAILED)
    +
    +    def is_waiting(self):
    +        if self.stub_type:
    +            return not self.has_ended()
    +        else:
    +            return self.status in (self.PENDING, self.RETRYING)
    +
    +    @classmethod
    +    def from_api_task(cls, api_task, executor, **kwargs):
    +        from aria.orchestrator import context
    +        instantiation_kwargs = {}
    +
    +        if hasattr(api_task.actor, 'outbound_relationships'):
    --- End diff --
    
    maybe api_task has `node` attribute


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/156#discussion_r122956391
  
    --- Diff: aria/orchestrator/workflows/executor/base.py ---
    @@ -28,19 +28,20 @@ class BaseExecutor(logger.LoggerMixin):
         def _execute(self, task):
             raise NotImplementedError
     
    -    def execute(self, task):
    +    def execute(self, ctx):
             """
             Execute a task
             :param task: task to execute
             """
    -        if task.function:
    -            self._execute(task)
    -        else:
    -            # In this case the task is missing a function. This task still gets to an
    -            # executor, but since there is nothing to run, we by default simply skip the execution
    -            # itself.
    -            self._task_started(task)
    -            self._task_succeeded(task)
    +        with ctx.track_changes:
    --- End diff --
    
    reconsider


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/156#discussion_r122954646
  
    --- Diff: aria/orchestrator/workflows/events_logging.py ---
    @@ -34,54 +34,67 @@ def _get_task_name(task):
     
     
     @events.start_task_signal.connect
    -def _start_task_handler(task, **kwargs):
    -    # If the task has no function this is an empty task.
    -    if task.function:
    -        suffix = 'started...'
    -        logger = task.context.logger.info
    -    else:
    -        suffix = 'has no implementation'
    -        logger = task.context.logger.debug
    +def _start_task_handler(ctx, **kwargs):
    +    with ctx.track_changes:
    +        # If the task has no function this is an empty task.
    +        if ctx.task.function:
    +            suffix = 'started...'
    +            logger = ctx.logger.info
    +        else:
    +            suffix = 'has no implementation'
    +            logger = ctx.logger.debug
    +
    +        logger('{name} {task.interface_name}.{task.operation_name} {suffix}'.format(
    +            name=_get_task_name(ctx.task), task=ctx.task, suffix=suffix))
     
    -    logger('{name} {task.interface_name}.{task.operation_name} {suffix}'.format(
    -        name=_get_task_name(task), task=task, suffix=suffix))
     
     @events.on_success_task_signal.connect
    -def _success_task_handler(task, **kwargs):
    -    if not task.function:
    -        return
    -    task.context.logger.info('{name} {task.interface_name}.{task.operation_name} successful'
    -                             .format(name=_get_task_name(task), task=task))
    +def _success_task_handler(ctx, **kwargs):
    +    with ctx.track_changes:
    +        if not ctx.task.function:
    +            return
    +        ctx.logger.info('{name} {task.interface_name}.{task.operation_name} successful'
    +                        .format(name=_get_task_name(ctx.task), task=ctx.task))
     
     
     @events.on_failure_task_signal.connect
    -def _failure_operation_handler(task, traceback, **kwargs):
    -    task.context.logger.error(
    -        '{name} {task.interface_name}.{task.operation_name} failed'
    -        .format(name=_get_task_name(task), task=task), extra=dict(traceback=traceback)
    -    )
    +def _failure_operation_handler(ctx, traceback, **kwargs):
    +    with ctx.track_changes:
    +        ctx.logger.error(
    +            '{name} {task.interface_name}.{task.operation_name} failed'
    +            .format(name=_get_task_name(ctx.task), task=ctx.task), extra=dict(traceback=traceback)
    +        )
     
     
     @events.start_workflow_signal.connect
     def _start_workflow_handler(context, **kwargs):
    -    context.logger.info("Starting '{ctx.workflow_name}' workflow execution".format(ctx=context))
    +    with context.track_changes:
    +        context.logger.info("Starting '{ctx.workflow_name}' workflow execution".format(ctx=context))
     
     
     @events.on_failure_workflow_signal.connect
     def _failure_workflow_handler(context, **kwargs):
    -    context.logger.info("'{ctx.workflow_name}' workflow execution failed".format(ctx=context))
    +    with context.track_changes:
    +        context.logger.info("'{ctx.workflow_name}' workflow execution failed".format(ctx=context))
     
     
     @events.on_success_workflow_signal.connect
     def _success_workflow_handler(context, **kwargs):
    -    context.logger.info("'{ctx.workflow_name}' workflow execution succeeded".format(ctx=context))
    +    with context.track_changes:
    --- End diff --
    
    remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/156#discussion_r122953127
  
    --- Diff: aria/orchestrator/workflows/core/engine.py ---
    @@ -38,84 +35,192 @@ class Engine(logger.LoggerMixin):
         The workflow engine. Executes workflows
         """
     
    -    def __init__(self, executor, workflow_context, tasks_graph, **kwargs):
    +    def __init__(self, default_executor, **kwargs):
             super(Engine, self).__init__(**kwargs)
    -        self._workflow_context = workflow_context
    -        self._execution_graph = networkx.DiGraph()
    -        translation.build_execution_graph(task_graph=tasks_graph,
    -                                          execution_graph=self._execution_graph,
    -                                          default_executor=executor)
    +        self._executors = {default_executor.__class__: default_executor}
    +        self._executing_tasks = []
     
    -    def execute(self):
    +    def execute(self, ctx):
             """
             execute the workflow
             """
             try:
    -            events.start_workflow_signal.send(self._workflow_context)
    +            events.start_workflow_signal.send(ctx)
                 while True:
    -                cancel = self._is_cancel()
    +                cancel = self._is_cancel(ctx)
                     if cancel:
                         break
    -                for task in self._ended_tasks():
    -                    self._handle_ended_tasks(task)
    -                for task in self._executable_tasks():
    -                    self._handle_executable_task(task)
    -                if self._all_tasks_consumed():
    +                for task in self._ended_tasks(ctx):
    +                    self._handle_ended_tasks(ctx, task)
    +                for task in self._executable_tasks(ctx):
    +                    self._handle_executable_task(ctx, task)
    +                if self._all_tasks_consumed(ctx):
                         break
                     else:
                         time.sleep(0.1)
                 if cancel:
    -                events.on_cancelled_workflow_signal.send(self._workflow_context)
    +                events.on_cancelled_workflow_signal.send(ctx)
                 else:
    -                events.on_success_workflow_signal.send(self._workflow_context)
    +                events.on_success_workflow_signal.send(ctx)
             except BaseException as e:
    -            events.on_failure_workflow_signal.send(self._workflow_context, exception=e)
    +            events.on_failure_workflow_signal.send(ctx, exception=e)
                 raise
     
    -    def cancel_execution(self):
    +    @staticmethod
    +    def cancel_execution(ctx):
             """
             Send a cancel request to the engine. If execution already started, execution status
             will be modified to 'cancelling' status. If execution is in pending mode, execution status
             will be modified to 'cancelled' directly.
             """
    -        events.on_cancelling_workflow_signal.send(self._workflow_context)
    +        events.on_cancelling_workflow_signal.send(ctx)
     
    -    def _is_cancel(self):
    -        return self._workflow_context.execution.status in (models.Execution.CANCELLING,
    -                                                           models.Execution.CANCELLED)
    +    @staticmethod
    +    def _is_cancel(ctx):
    +        execution = ctx.model.execution.update(ctx.execution)
    +        return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED)
     
    -    def _executable_tasks(self):
    +    def _executable_tasks(self, ctx):
             now = datetime.utcnow()
    -        return (task for task in self._tasks_iter()
    -                if task.is_waiting() and
    -                task.due_at <= now and
    -                not self._task_has_dependencies(task))
    +        return (
    +            task for task in self._tasks_iter(ctx)
    +            if task.is_waiting() and task.due_at <= now and \
    +            not self._task_has_dependencies(ctx, task)
    +        )
     
    -    def _ended_tasks(self):
    -        return (task for task in self._tasks_iter() if task.has_ended())
    +    def _ended_tasks(self, ctx):
    +        for task in self._executing_tasks:
    +            if task.has_ended() and task in ctx._graph:
    +                yield task
     
    -    def _task_has_dependencies(self, task):
    -        return len(self._execution_graph.pred.get(task.id, {})) > 0
    -
    -    def _all_tasks_consumed(self):
    -        return len(self._execution_graph.node) == 0
    -
    -    def _tasks_iter(self):
    -        for _, data in self._execution_graph.nodes_iter(data=True):
    -            task = data['task']
    -            if isinstance(task, engine_task.OperationTask):
    -                if not task.model_task.has_ended():
    -                    self._workflow_context.model.task.refresh(task.model_task)
    -            yield task
    +    @staticmethod
    +    def _task_has_dependencies(ctx, task):
    +        return len(ctx._graph.pred.get(task, [])) > 0
     
         @staticmethod
    -    def _handle_executable_task(task):
    -        if isinstance(task, engine_task.OperationTask):
    -            events.sent_task_signal.send(task)
    -        task.execute()
    +    def _all_tasks_consumed(ctx):
    +        return len(ctx._graph.node) == 0
     
    -    def _handle_ended_tasks(self, task):
    +    @staticmethod
    +    def _tasks_iter(ctx):
    +        for task in ctx.execution.tasks:
    +            yield ctx.model.task.refresh(task)
    +
    +    def _handle_executable_task(self, ctx, task):
    +        if task._executor not in self._executors:
    +            self._executors[task._executor] = task._executor()
    +        task_executor = self._executors[task._executor]
    +
    +        context_cls = task._context_cls or operation.BaseOperationContext
    +        op_ctx = context_cls(
    +            model_storage=ctx.model,
    +            resource_storage=ctx.resource,
    +            workdir=ctx._workdir,
    +            task_id=task.id,
    +            actor_id=task.actor.id if task.actor else None,
    +            service_id=task.execution.service.id,
    +            execution_id=task.execution.id,
    +            name=task.name
    +        )
    +
    +        self._executing_tasks.append(task)
    +
    +        if not task.stub_type:
    +            events.sent_task_signal.send(op_ctx)
    +        task_executor.execute(op_ctx)
    +
    +    def _handle_ended_tasks(self, ctx, task):
    +        self._executing_tasks.remove(task)
             if task.status == models.Task.FAILED and not task.ignore_failure:
                 raise exceptions.ExecutorException('Workflow failed')
             else:
    -            self._execution_graph.remove_node(task.id)
    +            ctx._graph.remove_node(task)
    +
    +
    +def construct_execution_tasks(execution,
    +                              task_graph,
    +                              default_executor,
    +                              stub_executor=executor.base.StubTaskExecutor,
    +                              start_stub_type=models.Task.START_WORKFLOW,
    +                              end_stub_type=models.Task.END_WORKFLOW,
    +                              depends_on=()):
    +    """
    +    Translates the user graph to the execution graph
    +    :param task_graph: The user's graph
    +    :param start_stub_type: internal use
    +    :param end_stub_type: internal use
    +    :param depends_on: internal use
    +    """
    +    depends_on = list(depends_on)
    +
    +    # Insert start marker
    +    start_task = models.Task(api_id=_start_graph_suffix(task_graph.id),
    +                             _executor=stub_executor,
    +                             execution=execution,
    +                             stub_type=start_stub_type,
    +                             dependencies=depends_on)
    +
    +    for task in task_graph.topological_order(reverse=True):
    +        operation_dependencies = _get_tasks_from_dependencies(
    +            execution, task_graph.get_dependencies(task), [start_task])
    +
    +        if isinstance(task, api.task.OperationTask):
    +            models.Task.from_api_task(api_task=task,
    +                                      executor=default_executor,
    +                                      dependencies=operation_dependencies)
    +
    +        elif isinstance(task, api.task.WorkflowTask):
    +            # Build the graph recursively while adding start and end markers
    +            construct_execution_tasks(
    +                execution=execution,
    +                task_graph=task,
    +                default_executor=default_executor,
    +                stub_executor=stub_executor,
    +                start_stub_type=models.Task.START_SUBWROFKLOW,
    +                end_stub_type=models.Task.END_SUBWORKFLOW,
    +                depends_on=operation_dependencies
    +            )
    +        elif isinstance(task, api.task.StubTask):
    +            models.Task(api_id=task.id,
    +                        _executor=stub_executor,
    +                        execution=execution,
    +                        stub_type=models.Task.STUB,
    +                        dependencies=operation_dependencies)
    +        else:
    +            raise RuntimeError('Undefined state')
    +
    +    # Insert end marker
    +    models.Task(api_id=_end_graph_suffix(task_graph.id),
    +                _executor=stub_executor,
    +                execution=execution,
    +                stub_type=end_stub_type,
    +                dependencies=_get_non_dependent_tasks(execution) or [start_task])
    +
    +
    +def _start_graph_suffix(api_id):
    +    return '{0}-Start'.format(api_id)
    +
    +
    +def _end_graph_suffix(api_id):
    +    return '{0}-End'.format(api_id)
    +
    +
    +def _get_non_dependent_tasks(execution):
    +    dependency_tasks = set()
    +    for task in execution.tasks:
    +        dependency_tasks.update(task.dependencies)
    +    return list(set(execution.tasks) - set(dependency_tasks))
    +
    +
    +def _get_tasks_from_dependencies(execution, dependencies, default=()):
    +    """
    +    Returns task list from dependencies.
    +    """
    +    tasks = []
    +    for dependency in dependencies:
    +        if getattr(dependency, 'actor', False):
    --- End diff --
    
    stub_type


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/156#discussion_r122954093
  
    --- Diff: tests/end2end/testenv.py ---
    @@ -60,6 +60,9 @@ def uninstall_service(self, service_name=None, service_template_name=None, dry=F
     
         def execute_workflow(self, service_name, workflow_name, dry=False):
             self.cli.executions.start(workflow_name, service_name=service_name, dry=dry)
    +        service = self.model_storage.service.get_by_name(service_name)
    --- End diff --
    
    ???


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/156#discussion_r123008654
  
    --- Diff: aria/orchestrator/workflows/api/task.py ---
    @@ -140,6 +140,12 @@ def __init__(self,
             self.arguments = modeling_utils.merge_parameter_values(arguments,
                                                                    operation.arguments,
                                                                    model_cls=models.Argument)
    +        if getattr(self.actor, 'outbound_relationships', None) is not None:
    +            self._context_cls = context.operation.NodeOperationContext
    +        elif getattr(self.actor, 'source_node', None) is not None:
    +            self._context_cls = context.operation.RelationshipOperationContext
    +        else:
    +            self._context_cls = context.operation.BaseOperationContext
    --- End diff --
    
    raise


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/156#discussion_r122952055
  
    --- Diff: aria/orchestrator/workflows/core/engine.py ---
    @@ -38,84 +35,192 @@ class Engine(logger.LoggerMixin):
         The workflow engine. Executes workflows
         """
     
    -    def __init__(self, executor, workflow_context, tasks_graph, **kwargs):
    +    def __init__(self, default_executor, **kwargs):
             super(Engine, self).__init__(**kwargs)
    -        self._workflow_context = workflow_context
    -        self._execution_graph = networkx.DiGraph()
    -        translation.build_execution_graph(task_graph=tasks_graph,
    -                                          execution_graph=self._execution_graph,
    -                                          default_executor=executor)
    +        self._executors = {default_executor.__class__: default_executor}
    +        self._executing_tasks = []
     
    -    def execute(self):
    +    def execute(self, ctx):
             """
             execute the workflow
             """
             try:
    -            events.start_workflow_signal.send(self._workflow_context)
    +            events.start_workflow_signal.send(ctx)
                 while True:
    -                cancel = self._is_cancel()
    +                cancel = self._is_cancel(ctx)
                     if cancel:
                         break
    -                for task in self._ended_tasks():
    -                    self._handle_ended_tasks(task)
    -                for task in self._executable_tasks():
    -                    self._handle_executable_task(task)
    -                if self._all_tasks_consumed():
    +                for task in self._ended_tasks(ctx):
    +                    self._handle_ended_tasks(ctx, task)
    +                for task in self._executable_tasks(ctx):
    +                    self._handle_executable_task(ctx, task)
    +                if self._all_tasks_consumed(ctx):
                         break
                     else:
                         time.sleep(0.1)
                 if cancel:
    -                events.on_cancelled_workflow_signal.send(self._workflow_context)
    +                events.on_cancelled_workflow_signal.send(ctx)
                 else:
    -                events.on_success_workflow_signal.send(self._workflow_context)
    +                events.on_success_workflow_signal.send(ctx)
             except BaseException as e:
    -            events.on_failure_workflow_signal.send(self._workflow_context, exception=e)
    +            events.on_failure_workflow_signal.send(ctx, exception=e)
                 raise
     
    -    def cancel_execution(self):
    +    @staticmethod
    +    def cancel_execution(ctx):
             """
             Send a cancel request to the engine. If execution already started, execution status
             will be modified to 'cancelling' status. If execution is in pending mode, execution status
             will be modified to 'cancelled' directly.
             """
    -        events.on_cancelling_workflow_signal.send(self._workflow_context)
    +        events.on_cancelling_workflow_signal.send(ctx)
     
    -    def _is_cancel(self):
    -        return self._workflow_context.execution.status in (models.Execution.CANCELLING,
    -                                                           models.Execution.CANCELLED)
    +    @staticmethod
    +    def _is_cancel(ctx):
    +        execution = ctx.model.execution.update(ctx.execution)
    +        return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED)
     
    -    def _executable_tasks(self):
    +    def _executable_tasks(self, ctx):
             now = datetime.utcnow()
    -        return (task for task in self._tasks_iter()
    -                if task.is_waiting() and
    -                task.due_at <= now and
    -                not self._task_has_dependencies(task))
    +        return (
    +            task for task in self._tasks_iter(ctx)
    +            if task.is_waiting() and task.due_at <= now and \
    +            not self._task_has_dependencies(ctx, task)
    +        )
     
    -    def _ended_tasks(self):
    -        return (task for task in self._tasks_iter() if task.has_ended())
    +    def _ended_tasks(self, ctx):
    +        for task in self._executing_tasks:
    +            if task.has_ended() and task in ctx._graph:
    +                yield task
     
    -    def _task_has_dependencies(self, task):
    -        return len(self._execution_graph.pred.get(task.id, {})) > 0
    -
    -    def _all_tasks_consumed(self):
    -        return len(self._execution_graph.node) == 0
    -
    -    def _tasks_iter(self):
    -        for _, data in self._execution_graph.nodes_iter(data=True):
    -            task = data['task']
    -            if isinstance(task, engine_task.OperationTask):
    -                if not task.model_task.has_ended():
    -                    self._workflow_context.model.task.refresh(task.model_task)
    -            yield task
    +    @staticmethod
    +    def _task_has_dependencies(ctx, task):
    +        return len(ctx._graph.pred.get(task, [])) > 0
     
         @staticmethod
    -    def _handle_executable_task(task):
    -        if isinstance(task, engine_task.OperationTask):
    -            events.sent_task_signal.send(task)
    -        task.execute()
    +    def _all_tasks_consumed(ctx):
    +        return len(ctx._graph.node) == 0
     
    -    def _handle_ended_tasks(self, task):
    +    @staticmethod
    +    def _tasks_iter(ctx):
    +        for task in ctx.execution.tasks:
    +            yield ctx.model.task.refresh(task)
    +
    +    def _handle_executable_task(self, ctx, task):
    +        if task._executor not in self._executors:
    +            self._executors[task._executor] = task._executor()
    +        task_executor = self._executors[task._executor]
    +
    +        context_cls = task._context_cls or operation.BaseOperationContext
    +        op_ctx = context_cls(
    +            model_storage=ctx.model,
    +            resource_storage=ctx.resource,
    +            workdir=ctx._workdir,
    +            task_id=task.id,
    +            actor_id=task.actor.id if task.actor else None,
    +            service_id=task.execution.service.id,
    +            execution_id=task.execution.id,
    +            name=task.name
    +        )
    +
    +        self._executing_tasks.append(task)
    +
    +        if not task.stub_type:
    +            events.sent_task_signal.send(op_ctx)
    +        task_executor.execute(op_ctx)
    +
    +    def _handle_ended_tasks(self, ctx, task):
    +        self._executing_tasks.remove(task)
             if task.status == models.Task.FAILED and not task.ignore_failure:
                 raise exceptions.ExecutorException('Workflow failed')
             else:
    -            self._execution_graph.remove_node(task.id)
    +            ctx._graph.remove_node(task)
    +
    +
    +def construct_execution_tasks(execution,
    --- End diff --
    
    move to new module


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/156#discussion_r122946706
  
    --- Diff: aria/modeling/orchestration.py ---
    @@ -392,8 +397,66 @@ def abort(message=None):
         def retry(message=None, retry_interval=None):
             raise TaskRetryException(message, retry_interval=retry_interval)
     
    +    @declared_attr
    +    def dependency_fk(self):
    +        return relationship.foreign_key('task', nullable=True)
    +
    +    @declared_attr
    +    def dependencies(cls):
    +        # symmetric relationship causes funky graphs
    +        return relationship.one_to_many_self(cls, 'dependency_fk')
    +
    +    def has_ended(self):
    +        return self.status in (self.SUCCESS, self.FAILED)
    +
    +    def is_waiting(self):
    +        if self.stub_type:
    +            return not self.has_ended()
    +        else:
    +            return self.status in (self.PENDING, self.RETRYING)
    +
    +    @classmethod
    +    def from_api_task(cls, api_task, executor, **kwargs):
    +        from aria.orchestrator import context
    --- End diff --
    
    get context as arg


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/156#discussion_r123009805
  
    --- Diff: aria/orchestrator/workflows/core/task.py ---
    @@ -0,0 +1,119 @@
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#     http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +
    +"""
    +The workflow engine. Executes workflows
    --- End diff --
    
    remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/156#discussion_r122949389
  
    --- Diff: aria/orchestrator/workflow_runner.py ---
    @@ -80,15 +80,19 @@ def __init__(self, workflow_name, service_id, inputs,
                 task_max_attempts=task_max_attempts,
                 task_retry_interval=task_retry_interval)
     
    +        # Set default executor and kwargs
    +        executor = executor or ProcessExecutor(plugin_manager=plugin_manager)
    +
             # transforming the execution inputs to dict, to pass them to the workflow function
             execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values())
    -        self._tasks_graph = workflow_fn(ctx=workflow_context, **execution_inputs_dict)
     
    -        executor = executor or ProcessExecutor(plugin_manager=plugin_manager)
    -        self._engine = Engine(
    -            executor=executor,
    -            workflow_context=workflow_context,
    -            tasks_graph=self._tasks_graph)
    +        self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
    +        engine.construct_execution_tasks(self.execution, self._tasks_graph, executor.__class__)
    +
    +        # Update the state
    +        self._model_storage.execution.update(execution)
    +
    +        self._engine = engine.Engine(default_executor=executor)
    --- End diff --
    
    executors=(executor, )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #156: ARIA-278 remove core tasks

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-ariatosca/pull/156


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---