You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/01/07 18:51:07 UTC

[GitHub] [airflow] dazza-codes commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

dazza-codes commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r363896237
 
 

 ##########
 File path: airflow/executors/base_executor.py
 ##########
 @@ -94,22 +96,43 @@ def queue_task_instance(
         # cfg_path is needed to propagate the config values if using impersonation
         # (run_as_user), given that there are different code paths running tasks.
         # For a long term solution we need to address AIRFLOW-1986
-        command_list_to_run = task_instance.command_as_list(
-            local=True,
+        deferred_run = task_instance.get_local_task_job_deferred_run(
             mark_success=mark_success,
             ignore_all_deps=ignore_all_deps,
             ignore_depends_on_past=ignore_depends_on_past,
             ignore_task_deps=ignore_task_deps,
             ignore_ti_state=ignore_ti_state,
             pool=pool,
             pickle_id=pickle_id,
-            cfg_path=cfg_path)
-        self.queue_command(
+            cfg_path=cfg_path,
+        )
+        self._queue_deferred_run(
             SimpleTaskInstance(task_instance),
-            command_list_to_run,
+            deferred_run,
             priority=task_instance.task.priority_weight_total,
             queue=task_instance.task.queue)
 
+    def queue_simple_task_instance(self, simple_task_instance: SimpleTaskInstance, simple_dag: SimpleDag):
+        """Queues simple task instance."""
+        priority = simple_task_instance.priority_weight
+        queue = simple_task_instance.queue
+
+        queue_task_run = LocalTaskJobDeferredRun(
+            dag_id=simple_task_instance.dag_id,
+            task_id=simple_task_instance.task_id,
+            execution_date=simple_task_instance.execution_date,
+            pool=simple_task_instance.pool,
+            subdir=simple_dag.full_filepath,
+            pickle_id=simple_dag.pickle_id
+        )
+
+        self._queue_deferred_run(
+            simple_task_instance,
+            queue_task_run,
+            priority=priority,
+            queue=queue
+        )
+
 
 Review comment:
   Looking at this method, it starts to look like a feature-envy code-smell - does the refactor somehow add/move a task queue on the `SimpleTaskInstance` or the former `command` attributes might belong on that class instead of adding a new class for `LocalTaskJobDeferredRun` or is the latter perhaps a subclass of `SimpleTaskInstance`?  
   
   With regard to naming, why is `SimpleTaskInstance` not named simply `SimpleTask` because it seems weird to have a class name that refers to an "instance" or "object" of the class (sorry, I don't know enough about this thing to understand why it needs to have `Instance` in the class name).  The introduction of a new class just begs a few questions about how these things are related with regard to generics and specifics and whether or not they need clarification with regard to `Simple` and `Local` vs. `Distributed`, `Delayed` and what the distinctions are between `Task` and `Job` (pardon my ignorance and I could take this offline of course).  If a `DelayedTask` could use a mixin of both a `SimpleTask` and a `QueuedTask`, it might be clearer?  The use of `Deferred` vs. `Queued` vs. `Async` terms could be clarified, esp. in the context of `asyncio` where `Async` could imply an event loop and coop-concurrency.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services