You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/01/06 20:22:02 UTC

[GitHub] [airflow] mik-laj opened a new pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

mik-laj opened a new pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085
 
 
   Most of the executors run local task jobs by running `airflow tasks run ...`. This is achieved by passing  `['airflow', 'tasks', 'run', ...]` object to subprocess.check_call. 
   
   This is very limiting when creating new executors that do not necessarily want to start a new process when starting a new process, e.g. fork a process instead of create.
   
   We could achieve a similar effect if we process the argument list back, but this is an ugly and hack solution, so I did refactor the code and now the executor passes the LocalTaskJobDeferredRun object that contains all the detailed information. A particular executor could create a command if it needs it.
   
   This will facilitate the development of other executors:
   https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-29%3A+AWS+Fargate+Executor (@aelzeiny)
   https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-28%3A+Add+AsyncExecutor+option (@dazza-codes)
   https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-25+The+Knative+Executor (@dimberman 
   https://github.com/apache/airflow/pull/6750 (@nuclearpinguin )
   This also made the DebugExecutor code simpler. (@nuclearpinguin )
   ---
   Issue link: WILL BE INSERTED BY [boring-cyborg](https://github.com/kaxil/boring-cyborg)
   
   - [X] Description above provides context of the change
   - [X] Commit message/PR title starts with `[AIRFLOW-NNNN]`. AIRFLOW-NNNN = JIRA ID<sup>*</sup>
   - [X] Unit tests coverage for changes (not needed for documentation changes)
   - [X] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [X] Relevant documentation is updated including usage instructions.
   - [X] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   <sup>*</sup> For document-only changes commit message can start with `[AIRFLOW-XXXX]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] dazza-codes commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
dazza-codes commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r363898046
 
 

 ##########
 File path: airflow/executors/base_executor.py
 ##########
 @@ -232,14 +255,14 @@ def get_event_buffer(self, dag_ids=None) -> Dict[TaskInstanceKeyType, Optional[s
 
     def execute_async(self,
                       key: TaskInstanceKeyType,
-                      command: CommandType,
+                      deferred_run: LocalTaskJobDeferredRun,
                       queue: Optional[str] = None,
 
 Review comment:
   nit: the number of concepts in the name for `LocalTaskJobDeferredRun` begs so many questions about what the hec that thing is and how the whole OOP design is evolving such that complex combinations of so many concepts is necessary in this class name.  e.g. why is it not simple `DeferredTask` or `LocalTask` or `QueuedTask` (and `Queued` implies `DeferredRun` and what's the difference between `Task` and `Job` and why does this one class capture both concepts at the same time?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] dazza-codes commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
dazza-codes commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r363898046
 
 

 ##########
 File path: airflow/executors/base_executor.py
 ##########
 @@ -232,14 +255,14 @@ def get_event_buffer(self, dag_ids=None) -> Dict[TaskInstanceKeyType, Optional[s
 
     def execute_async(self,
                       key: TaskInstanceKeyType,
-                      command: CommandType,
+                      deferred_run: LocalTaskJobDeferredRun,
                       queue: Optional[str] = None,
 
 Review comment:
   nit: the number of concepts in the name for `LocalTaskJobDeferredRun` begs so many questions about what the hec that thing is and how the whole OOP design is evolving such that complex combinations of so many concepts is necessary in this class name.  e.g. why is it not simple `DeferredTask` or `LocalTask` or `QueuedTask` (and `Queued` implies `DeferredRun`) and what's the difference between `Task` and `Job` and why does this one class capture both concepts at the same time?  The method-name uses `async` while the param and the class name of the param uses `deferred` and the method itself calls on `queue` methods, so there's lots of similar concepts involved here and if the public API is changing anyway it could be an opportunity to review and clarify the terms used.  (Of course, happy to take this nit-pick offline where I might get educated about all the things that make this complex combination of concepts necessary.)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] dazza-codes commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
dazza-codes commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r363898046
 
 

 ##########
 File path: airflow/executors/base_executor.py
 ##########
 @@ -232,14 +255,14 @@ def get_event_buffer(self, dag_ids=None) -> Dict[TaskInstanceKeyType, Optional[s
 
     def execute_async(self,
                       key: TaskInstanceKeyType,
-                      command: CommandType,
+                      deferred_run: LocalTaskJobDeferredRun,
                       queue: Optional[str] = None,
 
 Review comment:
   nit: the number of concepts in the name for `LocalTaskJobDeferredRun` begs so many questions about what the hec that thing is and how the whole OOP design is evolving such that complex combinations of so many concepts is necessary in this class name.  e.g. why is it not simple `DeferredTask` or `LocalTask` or `QueuedTask` (and `Queued` implies `DeferredRun`) and what's the difference between `Task` and `Job` and why does this one class capture both concepts at the same time?  The method-name uses `async` while the param and the class name of the param uses `deferred` and the method itself calls on `queue` methods, so there's lots of similar concepts involved here and if the public API is changing anyway it could be an opportunity to review and clarify the terms used.  (Of course, happy to take this nit-pick offline where I might get educated about all the things that make this complex combination of concepts necessary.  Also, I understand these questions imply some kind of scope creep for this PR and, well, yea, sorry about that.)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] dazza-codes commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
dazza-codes commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r363898046
 
 

 ##########
 File path: airflow/executors/base_executor.py
 ##########
 @@ -232,14 +255,14 @@ def get_event_buffer(self, dag_ids=None) -> Dict[TaskInstanceKeyType, Optional[s
 
     def execute_async(self,
                       key: TaskInstanceKeyType,
-                      command: CommandType,
+                      deferred_run: LocalTaskJobDeferredRun,
                       queue: Optional[str] = None,
 
 Review comment:
   nit: the number of concepts in the name for `LocalTaskJobDeferredRun` begs so many questions about what the hec that thing is and how the whole OOP design is evolving such that complex combinations of so many concepts is necessary in this class name.  e.g. why is it not simple `DeferredTask` or `LocalTask` or `QueuedTask` (and `Queued` implies `DeferredRun`) and what's the difference between `Task` and `Job` and why does this one class capture both concepts at the same time?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] dazza-codes commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
dazza-codes commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r363896237
 
 

 ##########
 File path: airflow/executors/base_executor.py
 ##########
 @@ -94,22 +96,43 @@ def queue_task_instance(
         # cfg_path is needed to propagate the config values if using impersonation
         # (run_as_user), given that there are different code paths running tasks.
         # For a long term solution we need to address AIRFLOW-1986
-        command_list_to_run = task_instance.command_as_list(
-            local=True,
+        deferred_run = task_instance.get_local_task_job_deferred_run(
             mark_success=mark_success,
             ignore_all_deps=ignore_all_deps,
             ignore_depends_on_past=ignore_depends_on_past,
             ignore_task_deps=ignore_task_deps,
             ignore_ti_state=ignore_ti_state,
             pool=pool,
             pickle_id=pickle_id,
-            cfg_path=cfg_path)
-        self.queue_command(
+            cfg_path=cfg_path,
+        )
+        self._queue_deferred_run(
             SimpleTaskInstance(task_instance),
-            command_list_to_run,
+            deferred_run,
             priority=task_instance.task.priority_weight_total,
             queue=task_instance.task.queue)
 
+    def queue_simple_task_instance(self, simple_task_instance: SimpleTaskInstance, simple_dag: SimpleDag):
+        """Queues simple task instance."""
+        priority = simple_task_instance.priority_weight
+        queue = simple_task_instance.queue
+
+        queue_task_run = LocalTaskJobDeferredRun(
+            dag_id=simple_task_instance.dag_id,
+            task_id=simple_task_instance.task_id,
+            execution_date=simple_task_instance.execution_date,
+            pool=simple_task_instance.pool,
+            subdir=simple_dag.full_filepath,
+            pickle_id=simple_dag.pickle_id
+        )
+
+        self._queue_deferred_run(
+            simple_task_instance,
+            queue_task_run,
+            priority=priority,
+            queue=queue
+        )
+
 
 Review comment:
   Looking at this method, it starts to look like a feature-envy code-smell - does the refactor somehow add/move a task queue on the `SimpleTaskInstance` or the former `command` attributes might belong on that class instead of adding a new class for `LocalTaskJobDeferredRun` or is the latter perhaps a subclass of `SimpleTaskInstance`?  
   
   With regard to naming, why is `SimpleTaskInstance` not named simply `SimpleTask` because it seems weird to have a class name that refers to an "instance" or "object" of the class (sorry, I don't know enough about this thing to understand why it needs to have `Instance` in the class name).  The introduction of a new class just begs a few questions about how these things are related with regard to generics and specifics and whether or not they need clarification with regard to `Simple` and `Local` vs. `Distributed`, `Delayed` and what the distinctions are between `Task` and `Job` (pardon my ignorance and I could take this offline of course).  If a `DelayedTask` could use a mixin of both a `SimpleTask` and a `QueuedTask`, it might be clearer?  The use of `Deferred` vs. `Queued` vs. `Async` terms could be clarified, esp. in the context of `asyncio` where `Async` could imply an event loop and coop-concurrency.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] dazza-codes commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
dazza-codes commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r363898046
 
 

 ##########
 File path: airflow/executors/base_executor.py
 ##########
 @@ -232,14 +255,14 @@ def get_event_buffer(self, dag_ids=None) -> Dict[TaskInstanceKeyType, Optional[s
 
     def execute_async(self,
                       key: TaskInstanceKeyType,
-                      command: CommandType,
+                      deferred_run: LocalTaskJobDeferredRun,
                       queue: Optional[str] = None,
 
 Review comment:
   nit: the number of concepts in the name for `LocalTaskJobDeferredRun` begs so many questions about what the hec that thing is and how the whole OOP design is evolving such that complex combinations of so many concepts is necessary in this class name.  e.g. why is it not simple `DeferredTask` or `LocalTask` or `QueuedTask` (and `Queued` implies `DeferredRun`) and what's the difference between `Task` and `Job` and why does this one class capture both concepts at the same time?  The method-name uses `async` while the param and the class name of the param uses `deferred` and the method itself calls on `queue` methods, so there's lots of similar concepts involved here and if the public API is changing anyway it could be an opportunity to review and clarify the terms used.  (Of course, happy to take this nit-pick offline where I might get educated about all the things that make this complex combination of concepts necessary.  Also, I understand these questions imply some kind of scope creep for this PR and, well, yea, sorry about that.)
   - maybe `execute_async` is better named `queue_job`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] dimberman commented on issue #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
dimberman commented on issue #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#issuecomment-572265141
 
 
   A massive +1 on this one. It also makes debugging/testing a nightmare.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r364721618
 
 

 ##########
 File path: airflow/executors/base_executor.py
 ##########
 @@ -232,14 +255,14 @@ def get_event_buffer(self, dag_ids=None) -> Dict[TaskInstanceKeyType, Optional[s
 
     def execute_async(self,
                       key: TaskInstanceKeyType,
-                      command: CommandType,
+                      deferred_run: LocalTaskJobDeferredRun,
                       queue: Optional[str] = None,
 
 Review comment:
   Oh right all the `ignore_*` etc. Gotcha. I somehow managed to forget all of those.
   
   I think we should avoid "queue" here as Airflow already has a concept of queues, and this is not what we are dealing with at this level.
   
   I think Celery calls this a "Request" http://docs.celeryproject.org/en/latest/reference/celery.worker.request.html - how about TaskExecutionRequest. That or SomethingMessage? `TaskExecutionMessage`.
   
   (Sorry to be pick about names, but I think it's important here.)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r364740101
 
 

 ##########
 File path: airflow/models/queue_task_run.py
 ##########
 @@ -0,0 +1,206 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from airflow.utils import timezone
+
+
+class RawTaskDeferredRun:  # pylint: disable=too-many-instance-attributes
+    """
+    Generates the shell command required to execute this task instance.
+
+    :param dag_id: DAG ID
+    :type dag_id: unicode
+    :param task_id: Task ID
+    :type task_id: unicode
+    :param execution_date: Execution date for the task
+    :type execution_date: datetime.datetime
+    :param mark_success: Whether to mark the task as successful
+    :type mark_success: bool
+    :param pickle_id: If the DAG was serialized to the DB, the ID
+        associated with the pickled DAG
+    :type pickle_id: unicode
+    :param subdir: path to the file containing the DAG definition
+    :param job_id: job ID (needs more details)
+    :param pool: the Airflow pool that the task should run in
+    :type pool: unicode
+    :param cfg_path: the Path to the configuration file
+    :type cfg_path: str
+    :return: shell command that can be used to run the task instance
+    """
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        dag_id,
+        task_id,
+        execution_date,
+        mark_success=None,
+        pickle_id=None,
+        job_id=None,
+        force=None,
+        pool=None,
+        subdir=None,
+        cfg_path=None,
+        mock_command=None,
+    ):
+        self.dag_id = dag_id
+        self.task_id = task_id
+        if isinstance(execution_date, str):
+            self.execution_date = timezone.parse(execution_date)
+        else:
+            self.execution_date = execution_date
+        self.mark_success = mark_success
+        self.pickle_id = pickle_id
+        self.job_id = job_id
+        self.force = force
+        self.pool = pool
+        self.subdir = subdir
+        self.cfg_path = cfg_path
+        self.mock_command = mock_command
+
+    def as_command(self):
+        """Generate CLI command"""
+        if self.mock_command:
+            return self.mock_command
+        iso = self.execution_date.isoformat()
+        cmd = ["airflow", "tasks", "run", str(self.dag_id), str(self.task_id), str(iso), "--raw"]
+        if self.mark_success:
+            cmd.extend(["--mark_success"])
+        if self.pickle_id:
+            cmd.extend(["--pickle", str(self.pickle_id)])
+        if self.job_id:
+            cmd.extend(["--job_id", str(self.job_id)])
+        if self.force:
+            cmd.extend(["--force"])
+        if self.pool:
+            cmd.extend(["--pool", self.pool])
+        if self.subdir:
+            cmd.extend(["--subdir", self.subdir])
+        if self.cfg_path:
+            cmd.extend(["--cfg_path", self.cfg_path])
+        return cmd
+
+    def __repr__(self):
+        iso = self.execution_date.isoformat()
+        return f"RawTaskDeferredRun(dag_id={self.dag_id}, task_id={self.task_id}, execution_date={iso})"
+
+
+class LocalTaskJobDeferredRun:  # pylint: disable=too-many-instance-attributes
+    """
+    Generates the shell command required to execute this task instance.
+
+    :param dag_id: DAG ID
+    :type dag_id: unicode
+    :param task_id: Task ID
+    :type task_id: unicode
+    :param execution_date: Execution date for the task
+    :type execution_date: datetime.datetime
+    :param mark_success: Whether to mark the task as successful
+    :type mark_success: bool
+    :param ignore_all_dependencies: Ignore all ignorable dependencies.
+        Overrides the other ignore_* parameters.
+    :type ignore_all_dependencies: bool
+    :param ignore_depends_on_past: Ignore depends_on_past parameter of DAGs
+        (e.g. for Backfills)
+    :type ignore_depends_on_past: bool
+    :param ignore_dependencies: Ignore task-specific dependencies such as depends_on_past
+        and trigger rule
+    :type ignore_dependencies: bool
+    :param force: Ignore the task instance's previous failure/success
+    :type force: bool
+    :param local: Whether to run the task locally
+    :type local: bool
+    :param pickle_id: If the DAG was serialized to the DB, the ID
+        associated with the pickled DAG
+    :type pickle_id: unicode
+    :param subdir: path to the file containing the DAG definition
+    :param raw: raw mode (needs more details)
+    :param job_id: job ID (needs more details)
+    :param pool: the Airflow pool that the task should run in
+    :type pool: unicode
+    :param cfg_path: the Path to the configuration file
+    :type cfg_path: str
+    :return: shell command that can be used to run the task instance
+    """
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        dag_id,
+        task_id,
+        execution_date,
+        mark_success=None,
+        pickle_id=None,
+        job_id=None,
+        ignore_all_dependencies=None,
+        ignore_dependencies=None,
+        ignore_depends_on_past=None,
+        force=None,
+        local=None,
+        pool=None,
+        raw=None,
+        subdir=None,
+        cfg_path=None,
 
 Review comment:
   They can't because these values are not known when creating SimpleTaskInstance. We can try to set the attribute later, but this is mixing concepts in my opinion.  But I prefer when the class has only one responsibility.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] stale[bot] commented on issue #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
stale[bot] commented on issue #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#issuecomment-609032116
 
 
   This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r364659359
 
 

 ##########
 File path: airflow/models/queue_task_run.py
 ##########
 @@ -0,0 +1,206 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from airflow.utils import timezone
+
+
+class RawTaskDeferredRun:  # pylint: disable=too-many-instance-attributes
+    """
+    Generates the shell command required to execute this task instance.
+
+    :param dag_id: DAG ID
+    :type dag_id: unicode
+    :param task_id: Task ID
+    :type task_id: unicode
+    :param execution_date: Execution date for the task
+    :type execution_date: datetime.datetime
+    :param mark_success: Whether to mark the task as successful
+    :type mark_success: bool
+    :param pickle_id: If the DAG was serialized to the DB, the ID
+        associated with the pickled DAG
+    :type pickle_id: unicode
+    :param subdir: path to the file containing the DAG definition
+    :param job_id: job ID (needs more details)
+    :param pool: the Airflow pool that the task should run in
+    :type pool: unicode
+    :param cfg_path: the Path to the configuration file
+    :type cfg_path: str
+    :return: shell command that can be used to run the task instance
+    """
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        dag_id,
+        task_id,
+        execution_date,
+        mark_success=None,
+        pickle_id=None,
+        job_id=None,
+        force=None,
+        pool=None,
+        subdir=None,
+        cfg_path=None,
+        mock_command=None,
+    ):
+        self.dag_id = dag_id
+        self.task_id = task_id
+        if isinstance(execution_date, str):
+            self.execution_date = timezone.parse(execution_date)
+        else:
+            self.execution_date = execution_date
+        self.mark_success = mark_success
+        self.pickle_id = pickle_id
+        self.job_id = job_id
+        self.force = force
+        self.pool = pool
+        self.subdir = subdir
+        self.cfg_path = cfg_path
+        self.mock_command = mock_command
+
+    def as_command(self):
+        """Generate CLI command"""
+        if self.mock_command:
+            return self.mock_command
+        iso = self.execution_date.isoformat()
+        cmd = ["airflow", "tasks", "run", str(self.dag_id), str(self.task_id), str(iso), "--raw"]
+        if self.mark_success:
+            cmd.extend(["--mark_success"])
+        if self.pickle_id:
+            cmd.extend(["--pickle", str(self.pickle_id)])
+        if self.job_id:
+            cmd.extend(["--job_id", str(self.job_id)])
+        if self.force:
+            cmd.extend(["--force"])
+        if self.pool:
+            cmd.extend(["--pool", self.pool])
+        if self.subdir:
+            cmd.extend(["--subdir", self.subdir])
+        if self.cfg_path:
+            cmd.extend(["--cfg_path", self.cfg_path])
+        return cmd
+
+    def __repr__(self):
+        iso = self.execution_date.isoformat()
+        return f"RawTaskDeferredRun(dag_id={self.dag_id}, task_id={self.task_id}, execution_date={iso})"
+
+
+class LocalTaskJobDeferredRun:  # pylint: disable=too-many-instance-attributes
+    """
+    Generates the shell command required to execute this task instance.
+
+    :param dag_id: DAG ID
+    :type dag_id: unicode
+    :param task_id: Task ID
+    :type task_id: unicode
+    :param execution_date: Execution date for the task
+    :type execution_date: datetime.datetime
+    :param mark_success: Whether to mark the task as successful
+    :type mark_success: bool
+    :param ignore_all_dependencies: Ignore all ignorable dependencies.
+        Overrides the other ignore_* parameters.
+    :type ignore_all_dependencies: bool
+    :param ignore_depends_on_past: Ignore depends_on_past parameter of DAGs
+        (e.g. for Backfills)
+    :type ignore_depends_on_past: bool
+    :param ignore_dependencies: Ignore task-specific dependencies such as depends_on_past
+        and trigger rule
+    :type ignore_dependencies: bool
+    :param force: Ignore the task instance's previous failure/success
+    :type force: bool
+    :param local: Whether to run the task locally
+    :type local: bool
+    :param pickle_id: If the DAG was serialized to the DB, the ID
+        associated with the pickled DAG
+    :type pickle_id: unicode
+    :param subdir: path to the file containing the DAG definition
+    :param raw: raw mode (needs more details)
+    :param job_id: job ID (needs more details)
+    :param pool: the Airflow pool that the task should run in
+    :type pool: unicode
+    :param cfg_path: the Path to the configuration file
+    :type cfg_path: str
+    :return: shell command that can be used to run the task instance
+    """
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        dag_id,
+        task_id,
+        execution_date,
+        mark_success=None,
+        pickle_id=None,
+        job_id=None,
+        ignore_all_dependencies=None,
+        ignore_dependencies=None,
+        ignore_depends_on_past=None,
+        force=None,
+        local=None,
+        pool=None,
+        raw=None,
+        subdir=None,
+        cfg_path=None,
 
 Review comment:
   There's an _awful_ lot of overlap here with SimpleTaskInstance -- could this all instead by methods on SimpleTI (if we don't just remove this entirely as per my previous comment)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
nuclearpinguin commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r363750540
 
 

 ##########
 File path: airflow/models/queue_task_run.py
 ##########
 @@ -0,0 +1,206 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from airflow.utils import timezone
+
+
+class RawTaskDeferredRun:  # pylint: disable=too-many-instance-attributes
 
 Review comment:
   Wouldn't `DeferredRawTaskRun` be better? As discussed offline I have mixed feelings regarding the `deferred` word. Maybe `future` or just `queued`? In the end, it's something that was queued. I have no strong opinion.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r364654068
 
 

 ##########
 File path: UPDATING.md
 ##########
 @@ -57,6 +57,44 @@ https://developers.google.com/style/inclusive-documentation
 
 -->
 
+### Introduction of LocalTaskJobDeferred in the Executor.
+
+The executor uses ``LocalTaskJobDeferredRun`` instead of a list of strings with the command to be executed.
+All methods and fields that previously used the `command` parameter now use `deferred_run`.
+If your executor only extends non-implemented methods from BaseExecutor, you only need to update
+the ``execute_async`` method.
 
 Review comment:
   ```suggestion
   the `execute_async` method.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
nuclearpinguin commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r364305865
 
 

 ##########
 File path: airflow/models/queue_task_run.py
 ##########
 @@ -0,0 +1,206 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from airflow.utils import timezone
+
+
+class RawTaskDeferredRun:  # pylint: disable=too-many-instance-attributes
+    """
+    Generates the shell command required to execute this task instance.
+
+    :param dag_id: DAG ID
+    :type dag_id: unicode
+    :param task_id: Task ID
+    :type task_id: unicode
+    :param execution_date: Execution date for the task
+    :type execution_date: datetime.datetime
+    :param mark_success: Whether to mark the task as successful
+    :type mark_success: bool
+    :param pickle_id: If the DAG was serialized to the DB, the ID
+        associated with the pickled DAG
+    :type pickle_id: unicode
+    :param subdir: path to the file containing the DAG definition
+    :param job_id: job ID (needs more details)
+    :param pool: the Airflow pool that the task should run in
+    :type pool: unicode
+    :param cfg_path: the Path to the configuration file
+    :type cfg_path: str
+    :return: shell command that can be used to run the task instance
+    """
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        dag_id,
+        task_id,
+        execution_date,
+        mark_success=None,
+        pickle_id=None,
+        job_id=None,
+        force=None,
+        pool=None,
+        subdir=None,
+        cfg_path=None,
+        mock_command=None,
+    ):
+        self.dag_id = dag_id
+        self.task_id = task_id
+        if isinstance(execution_date, str):
+            self.execution_date = timezone.parse(execution_date)
+        else:
+            self.execution_date = execution_date
+        self.mark_success = mark_success
+        self.pickle_id = pickle_id
+        self.job_id = job_id
+        self.force = force
+        self.pool = pool
+        self.subdir = subdir
+        self.cfg_path = cfg_path
+        self.mock_command = mock_command
+
+    def as_command(self):
+        """Generate CLI command"""
+        if self.mock_command:
+            return self.mock_command
+        iso = self.execution_date.isoformat()
+        cmd = ["airflow", "tasks", "run", str(self.dag_id), str(self.task_id), str(iso), "--raw"]
+        if self.mark_success:
+            cmd.extend(["--mark_success"])
+        if self.pickle_id:
+            cmd.extend(["--pickle", str(self.pickle_id)])
+        if self.job_id:
+            cmd.extend(["--job_id", str(self.job_id)])
+        if self.force:
+            cmd.extend(["--force"])
+        if self.pool:
+            cmd.extend(["--pool", self.pool])
+        if self.subdir:
+            cmd.extend(["--subdir", self.subdir])
+        if self.cfg_path:
+            cmd.extend(["--cfg_path", self.cfg_path])
+        return cmd
+
+    def __repr__(self):
+        iso = self.execution_date.isoformat()
+        return f"RawTaskDeferredRun(dag_id={self.dag_id}, task_id={self.task_id}, execution_date={iso})"
+
+
+class LocalTaskJobDeferredRun:  # pylint: disable=too-many-instance-attributes
 
 Review comment:
   > We now have two classes and a clearly defined purpose.
   
   That's true when you know what's going on. Updating the docstring should help, but I think it's worth to mention somewhere that this is one of many steps to improve how executors work. 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] dazza-codes commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
dazza-codes commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r363904383
 
 

 ##########
 File path: airflow/executors/celery_executor.py
 ##########
 @@ -22,15 +22,16 @@
 import time
 import traceback
 from multiprocessing import Pool, cpu_count
-from typing import Any, List, Optional, Tuple, Union
+from typing import Any, Dict, List, Optional, Tuple, Union
 
 from celery import Celery, Task, states as celery_states
 from celery.result import AsyncResult
 
 from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.executors.base_executor import BaseExecutor, CommandType
+from airflow.executors.base_executor import BaseExecutor
+from airflow.models.queue_task_run import LocalTaskJobDeferredRun
 from airflow.models.taskinstance import SimpleTaskInstance, TaskInstanceKeyType, TaskInstanceStateType
 
 Review comment:
   - if `queue_task_run` owns the new class, it implies a class name more like `QueuedTask` (and forget about `run` because that's implied anyway isn't it? And avoid the combination of `Task` with `Job` or use `QueuedJob` or `LocalQueuedJob` as a subclass of `QueuedJob`?)
   - `SimpleTaskInstance` is an instance of a task that could be recurring on a schedule and I wonder if the use of `Instance` and `Job` have some overlaps and how those terms related to scheduled tasks, task instances and task retries and whether a `job` is just one of the task-instance tries or something (best guess at how the complex concepts get chained together)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r364654111
 
 

 ##########
 File path: airflow/executors/base_executor.py
 ##########
 @@ -232,14 +255,14 @@ def get_event_buffer(self, dag_ids=None) -> Dict[TaskInstanceKeyType, Optional[s
 
     def execute_async(self,
                       key: TaskInstanceKeyType,
-                      command: CommandType,
+                      deferred_run: LocalTaskJobDeferredRun,
                       queue: Optional[str] = None,
 
 Review comment:
   It will change its name to QueueTask and move it to the `executor` package.
   
   > maybe execute_async is better named queue_job?
   
   I did not want changes in this PR that are not necessary. I will change the name of this method in a moment. 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
nuclearpinguin commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r364750670
 
 

 ##########
 File path: airflow/executors/base_executor.py
 ##########
 @@ -232,14 +255,14 @@ def get_event_buffer(self, dag_ids=None) -> Dict[TaskInstanceKeyType, Optional[s
 
     def execute_async(self,
                       key: TaskInstanceKeyType,
-                      command: CommandType,
+                      deferred_run: LocalTaskJobDeferredRun,
                       queue: Optional[str] = None,
 
 Review comment:
   `TaskExecutionRequest` +1

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r364741607
 
 

 ##########
 File path: airflow/executors/base_executor.py
 ##########
 @@ -232,14 +255,14 @@ def get_event_buffer(self, dag_ids=None) -> Dict[TaskInstanceKeyType, Optional[s
 
     def execute_async(self,
                       key: TaskInstanceKeyType,
-                      command: CommandType,
+                      deferred_run: LocalTaskJobDeferredRun,
                       queue: Optional[str] = None,
 
 Review comment:
   TaskExecutionRequest sounds good to me too.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r364654111
 
 

 ##########
 File path: airflow/executors/base_executor.py
 ##########
 @@ -232,14 +255,14 @@ def get_event_buffer(self, dag_ids=None) -> Dict[TaskInstanceKeyType, Optional[s
 
     def execute_async(self,
                       key: TaskInstanceKeyType,
-                      command: CommandType,
+                      deferred_run: LocalTaskJobDeferredRun,
                       queue: Optional[str] = None,
 
 Review comment:
   QueuedTask
   
   > maybe execute_async is better named queue_job?
   
   I did not want changes in this PR that are not necessary. I will change the name of this method in a moment. 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
nuclearpinguin commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r363743406
 
 

 ##########
 File path: airflow/models/queue_task_run.py
 ##########
 @@ -0,0 +1,206 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from airflow.utils import timezone
+
+
+class RawTaskDeferredRun:  # pylint: disable=too-many-instance-attributes
+    """
+    Generates the shell command required to execute this task instance.
+
+    :param dag_id: DAG ID
+    :type dag_id: unicode
+    :param task_id: Task ID
+    :type task_id: unicode
+    :param execution_date: Execution date for the task
+    :type execution_date: datetime.datetime
+    :param mark_success: Whether to mark the task as successful
+    :type mark_success: bool
+    :param pickle_id: If the DAG was serialized to the DB, the ID
+        associated with the pickled DAG
+    :type pickle_id: unicode
+    :param subdir: path to the file containing the DAG definition
+    :param job_id: job ID (needs more details)
+    :param pool: the Airflow pool that the task should run in
+    :type pool: unicode
+    :param cfg_path: the Path to the configuration file
+    :type cfg_path: str
+    :return: shell command that can be used to run the task instance
+    """
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        dag_id,
+        task_id,
+        execution_date,
+        mark_success=None,
+        pickle_id=None,
+        job_id=None,
+        force=None,
+        pool=None,
+        subdir=None,
+        cfg_path=None,
+        mock_command=None,
+    ):
+        self.dag_id = dag_id
+        self.task_id = task_id
+        if isinstance(execution_date, str):
+            self.execution_date = timezone.parse(execution_date)
+        else:
+            self.execution_date = execution_date
+        self.mark_success = mark_success
+        self.pickle_id = pickle_id
+        self.job_id = job_id
+        self.force = force
+        self.pool = pool
+        self.subdir = subdir
+        self.cfg_path = cfg_path
+        self.mock_command = mock_command
+
+    def as_command(self):
+        """Generate CLI command"""
+        if self.mock_command:
+            return self.mock_command
+        iso = self.execution_date.isoformat()
+        cmd = ["airflow", "tasks", "run", str(self.dag_id), str(self.task_id), str(iso), "--raw"]
+        if self.mark_success:
+            cmd.extend(["--mark_success"])
+        if self.pickle_id:
+            cmd.extend(["--pickle", str(self.pickle_id)])
+        if self.job_id:
+            cmd.extend(["--job_id", str(self.job_id)])
+        if self.force:
+            cmd.extend(["--force"])
+        if self.pool:
+            cmd.extend(["--pool", self.pool])
+        if self.subdir:
+            cmd.extend(["--subdir", self.subdir])
+        if self.cfg_path:
+            cmd.extend(["--cfg_path", self.cfg_path])
+        return cmd
+
+    def __repr__(self):
+        iso = self.execution_date.isoformat()
+        return f"RawTaskDeferredRun(dag_id={self.dag_id}, task_id={self.task_id}, execution_date={iso})"
+
+
+class LocalTaskJobDeferredRun:  # pylint: disable=too-many-instance-attributes
 
 Review comment:
   Do we have to introduce two different classes?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r363777195
 
 

 ##########
 File path: airflow/models/queue_task_run.py
 ##########
 @@ -0,0 +1,206 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from airflow.utils import timezone
+
+
+class RawTaskDeferredRun:  # pylint: disable=too-many-instance-attributes
+    """
+    Generates the shell command required to execute this task instance.
+
+    :param dag_id: DAG ID
+    :type dag_id: unicode
+    :param task_id: Task ID
+    :type task_id: unicode
+    :param execution_date: Execution date for the task
+    :type execution_date: datetime.datetime
+    :param mark_success: Whether to mark the task as successful
+    :type mark_success: bool
+    :param pickle_id: If the DAG was serialized to the DB, the ID
+        associated with the pickled DAG
+    :type pickle_id: unicode
+    :param subdir: path to the file containing the DAG definition
+    :param job_id: job ID (needs more details)
+    :param pool: the Airflow pool that the task should run in
+    :type pool: unicode
+    :param cfg_path: the Path to the configuration file
+    :type cfg_path: str
+    :return: shell command that can be used to run the task instance
+    """
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        dag_id,
+        task_id,
+        execution_date,
+        mark_success=None,
+        pickle_id=None,
+        job_id=None,
+        force=None,
+        pool=None,
+        subdir=None,
+        cfg_path=None,
+        mock_command=None,
+    ):
+        self.dag_id = dag_id
+        self.task_id = task_id
+        if isinstance(execution_date, str):
+            self.execution_date = timezone.parse(execution_date)
+        else:
+            self.execution_date = execution_date
+        self.mark_success = mark_success
+        self.pickle_id = pickle_id
+        self.job_id = job_id
+        self.force = force
+        self.pool = pool
+        self.subdir = subdir
+        self.cfg_path = cfg_path
+        self.mock_command = mock_command
+
+    def as_command(self):
+        """Generate CLI command"""
+        if self.mock_command:
+            return self.mock_command
+        iso = self.execution_date.isoformat()
+        cmd = ["airflow", "tasks", "run", str(self.dag_id), str(self.task_id), str(iso), "--raw"]
+        if self.mark_success:
+            cmd.extend(["--mark_success"])
+        if self.pickle_id:
+            cmd.extend(["--pickle", str(self.pickle_id)])
+        if self.job_id:
+            cmd.extend(["--job_id", str(self.job_id)])
+        if self.force:
+            cmd.extend(["--force"])
+        if self.pool:
+            cmd.extend(["--pool", self.pool])
+        if self.subdir:
+            cmd.extend(["--subdir", self.subdir])
+        if self.cfg_path:
+            cmd.extend(["--cfg_path", self.cfg_path])
+        return cmd
+
+    def __repr__(self):
+        iso = self.execution_date.isoformat()
+        return f"RawTaskDeferredRun(dag_id={self.dag_id}, task_id={self.task_id}, execution_date={iso})"
+
+
+class LocalTaskJobDeferredRun:  # pylint: disable=too-many-instance-attributes
 
 Review comment:
   We don't have to, but I think we should. This allows better type support and easier understanding of the code. Earlier we had a list of strings that did not tell the purpose of this list. We now have two classes and a clearly defined purpose.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r364653749
 
 

 ##########
 File path: UPDATING.md
 ##########
 @@ -57,6 +57,44 @@ https://developers.google.com/style/inclusive-documentation
 
 -->
 
+### Introduction of LocalTaskJobDeferred in the Executor.
+
+The executor uses ``LocalTaskJobDeferredRun`` instead of a list of strings with the command to be executed.
+All methods and fields that previously used the `command` parameter now use `deferred_run`.
+If your executor only extends non-implemented methods from BaseExecutor, you only need to update
+the ``execute_async`` method.
+
+The code below
+```diff
+    def execute_async(
+        self,
+        key: TaskInstanceKeyType,
+        command: CommandType,
+        queue: Optional[str] = None,
+        executor_config: Optional[Any] = None) -> None:
+
+        [...]
+
+        self.task_queue.put((key, command))
+```
+can be replaced by the following code:
+```diff
+    def execute_async(
+        self,
+        key: TaskInstanceKeyType,
+        deferred_run: LocalTaskJobDeferredRun,
+        queue: Optional[str] = None,
+        executor_config: Optional[Any] = None) -> None:
+
+        [...]
+
+        command = deferred_run.as_command()
+        self.task_queue.put((key, command))
+```
+
+This change allows the development of executors that run LocalTaskJob in a different way e.g.
+using fork instead of creating a new process.
 
 Review comment:
   This line doesn't quite hold true -- I can easily see how do change the task to forks instead of exec without any of these changes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] dimberman commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r364453845
 
 

 ##########
 File path: airflow/executors/base_executor.py
 ##########
 @@ -232,14 +255,14 @@ def get_event_buffer(self, dag_ids=None) -> Dict[TaskInstanceKeyType, Optional[s
 
     def execute_async(self,
                       key: TaskInstanceKeyType,
-                      command: CommandType,
+                      deferred_run: LocalTaskJobDeferredRun,
                       queue: Optional[str] = None,
 
 Review comment:
   I agree with @dazza-codes here. At first glance, the name LocalTaskJobDeferredRun has a LOT going on it and is kind of hard to understand for people not well acquainted with the code.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r364653272
 
 

 ##########
 File path: UPDATING.md
 ##########
 @@ -57,6 +57,44 @@ https://developers.google.com/style/inclusive-documentation
 
 -->
 
+### Introduction of LocalTaskJobDeferred in the Executor.
+
+The executor uses ``LocalTaskJobDeferredRun`` instead of a list of strings with the command to be executed.
+All methods and fields that previously used the `command` parameter now use `deferred_run`.
+If your executor only extends non-implemented methods from BaseExecutor, you only need to update
+the ``execute_async`` method.
+
+The code below
+```diff
+    def execute_async(
+        self,
+        key: TaskInstanceKeyType,
+        command: CommandType,
+        queue: Optional[str] = None,
+        executor_config: Optional[Any] = None) -> None:
+
+        [...]
+
+        self.task_queue.put((key, command))
+```
+can be replaced by the following code:
+```diff
 
 Review comment:
   ```suggestion
   ```python
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r364653974
 
 

 ##########
 File path: UPDATING.md
 ##########
 @@ -57,6 +57,44 @@ https://developers.google.com/style/inclusive-documentation
 
 -->
 
+### Introduction of LocalTaskJobDeferred in the Executor.
+
+The executor uses ``LocalTaskJobDeferredRun`` instead of a list of strings with the command to be executed.
 
 Review comment:
   ```suggestion
   The BaseExecutor now uses `LocalTaskJobDeferredRun` instead of a list of strings with the command to be executed.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r364658920
 
 

 ##########
 File path: airflow/models/queue_task_run.py
 ##########
 @@ -0,0 +1,206 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from airflow.utils import timezone
+
+
+class RawTaskDeferredRun:  # pylint: disable=too-many-instance-attributes
+    """
+    Generates the shell command required to execute this task instance.
+
+    :param dag_id: DAG ID
+    :type dag_id: unicode
+    :param task_id: Task ID
+    :type task_id: unicode
+    :param execution_date: Execution date for the task
+    :type execution_date: datetime.datetime
+    :param mark_success: Whether to mark the task as successful
+    :type mark_success: bool
+    :param pickle_id: If the DAG was serialized to the DB, the ID
+        associated with the pickled DAG
+    :type pickle_id: unicode
+    :param subdir: path to the file containing the DAG definition
+    :param job_id: job ID (needs more details)
+    :param pool: the Airflow pool that the task should run in
+    :type pool: unicode
+    :param cfg_path: the Path to the configuration file
+    :type cfg_path: str
+    :return: shell command that can be used to run the task instance
+    """
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        dag_id,
+        task_id,
+        execution_date,
+        mark_success=None,
+        pickle_id=None,
+        job_id=None,
+        force=None,
+        pool=None,
+        subdir=None,
+        cfg_path=None,
+        mock_command=None,
+    ):
+        self.dag_id = dag_id
+        self.task_id = task_id
+        if isinstance(execution_date, str):
+            self.execution_date = timezone.parse(execution_date)
+        else:
+            self.execution_date = execution_date
+        self.mark_success = mark_success
+        self.pickle_id = pickle_id
+        self.job_id = job_id
+        self.force = force
+        self.pool = pool
+        self.subdir = subdir
+        self.cfg_path = cfg_path
+        self.mock_command = mock_command
+
+    def as_command(self):
+        """Generate CLI command"""
+        if self.mock_command:
+            return self.mock_command
+        iso = self.execution_date.isoformat()
+        cmd = ["airflow", "tasks", "run", str(self.dag_id), str(self.task_id), str(iso), "--raw"]
+        if self.mark_success:
+            cmd.extend(["--mark_success"])
+        if self.pickle_id:
+            cmd.extend(["--pickle", str(self.pickle_id)])
+        if self.job_id:
+            cmd.extend(["--job_id", str(self.job_id)])
+        if self.force:
+            cmd.extend(["--force"])
+        if self.pool:
+            cmd.extend(["--pool", self.pool])
+        if self.subdir:
+            cmd.extend(["--subdir", self.subdir])
+        if self.cfg_path:
+            cmd.extend(["--cfg_path", self.cfg_path])
+        return cmd
+
+    def __repr__(self):
+        iso = self.execution_date.isoformat()
+        return f"RawTaskDeferredRun(dag_id={self.dag_id}, task_id={self.task_id}, execution_date={iso})"
+
+
+class LocalTaskJobDeferredRun:  # pylint: disable=too-many-instance-attributes
+    """
+    Generates the shell command required to execute this task instance.
+
+    :param dag_id: DAG ID
+    :type dag_id: unicode
+    :param task_id: Task ID
+    :type task_id: unicode
+    :param execution_date: Execution date for the task
+    :type execution_date: datetime.datetime
+    :param mark_success: Whether to mark the task as successful
+    :type mark_success: bool
+    :param ignore_all_dependencies: Ignore all ignorable dependencies.
+        Overrides the other ignore_* parameters.
+    :type ignore_all_dependencies: bool
+    :param ignore_depends_on_past: Ignore depends_on_past parameter of DAGs
+        (e.g. for Backfills)
+    :type ignore_depends_on_past: bool
+    :param ignore_dependencies: Ignore task-specific dependencies such as depends_on_past
+        and trigger rule
+    :type ignore_dependencies: bool
+    :param force: Ignore the task instance's previous failure/success
+    :type force: bool
+    :param local: Whether to run the task locally
+    :type local: bool
+    :param pickle_id: If the DAG was serialized to the DB, the ID
+        associated with the pickled DAG
+    :type pickle_id: unicode
+    :param subdir: path to the file containing the DAG definition
+    :param raw: raw mode (needs more details)
+    :param job_id: job ID (needs more details)
+    :param pool: the Airflow pool that the task should run in
+    :type pool: unicode
+    :param cfg_path: the Path to the configuration file
+    :type cfg_path: str
+    :return: shell command that can be used to run the task instance
 
 Review comment:
   Classes don't have returns.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r364655454
 
 

 ##########
 File path: airflow/executors/base_executor.py
 ##########
 @@ -94,22 +96,43 @@ def queue_task_instance(
         # cfg_path is needed to propagate the config values if using impersonation
         # (run_as_user), given that there are different code paths running tasks.
         # For a long term solution we need to address AIRFLOW-1986
-        command_list_to_run = task_instance.command_as_list(
-            local=True,
+        deferred_run = task_instance.get_local_task_job_deferred_run(
             mark_success=mark_success,
             ignore_all_deps=ignore_all_deps,
             ignore_depends_on_past=ignore_depends_on_past,
             ignore_task_deps=ignore_task_deps,
             ignore_ti_state=ignore_ti_state,
             pool=pool,
             pickle_id=pickle_id,
-            cfg_path=cfg_path)
-        self.queue_command(
+            cfg_path=cfg_path,
+        )
+        self._queue_deferred_run(
             SimpleTaskInstance(task_instance),
-            command_list_to_run,
+            deferred_run,
             priority=task_instance.task.priority_weight_total,
             queue=task_instance.task.queue)
 
+    def queue_simple_task_instance(self, simple_task_instance: SimpleTaskInstance, simple_dag: SimpleDag):
+        """Queues simple task instance."""
+        priority = simple_task_instance.priority_weight
+        queue = simple_task_instance.queue
+
+        queue_task_run = LocalTaskJobDeferredRun(
+            dag_id=simple_task_instance.dag_id,
+            task_id=simple_task_instance.task_id,
+            execution_date=simple_task_instance.execution_date,
+            pool=simple_task_instance.pool,
+            subdir=simple_dag.full_filepath,
+            pickle_id=simple_dag.pickle_id
+        )
+
+        self._queue_deferred_run(
+            simple_task_instance,
+            queue_task_run,
+            priority=priority,
+            queue=queue
+        )
+
 
 Review comment:
   Task = Operators in your DAG
   TaskInstance = an "instance" of that operator for a given execution date/dag run.
   
   Yes it's slightly odd naming, but it does make some sense.
   
   As for the point about having two classes: yes. @kaxil and I are working on removing the "Simple" versions as part of our dag serialization work so I think this is fine for now, and anyway removing that class should be its own PR.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj removed a comment on issue #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
mik-laj removed a comment on issue #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#issuecomment-571307257
 
 
   Does anyone have an idea on how to change the order of commits in The "commit" tab? I have a different order in the console and a different order on GH.
   ```
   2229be226 [AIRFLOW-6334] Use QueueTaskRun instead command in executors
   ad0aafb40 Make queue_command private
   17db1b155 Remove queue_task_instance in DebugExecutor
   cf74768f0 Extract LocalTaskJobDeferredRun from QueueTaskRun class
   08e66e24f Use get_queue_task_run instead of command_as_list in BaseTaskRunner
   47d0341a5 Remove unused parameters in QueueTaskRun
   4d8658b91 Rename QueueTaskRun class to RawTaskDeferredRun
   2a28de87c Rename parameters and variables
   23ed55e2c Add UPDATE.md note
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r364684848
 
 

 ##########
 File path: airflow/executors/base_executor.py
 ##########
 @@ -94,22 +96,43 @@ def queue_task_instance(
         # cfg_path is needed to propagate the config values if using impersonation
         # (run_as_user), given that there are different code paths running tasks.
         # For a long term solution we need to address AIRFLOW-1986
-        command_list_to_run = task_instance.command_as_list(
-            local=True,
+        deferred_run = task_instance.get_local_task_job_deferred_run(
             mark_success=mark_success,
             ignore_all_deps=ignore_all_deps,
             ignore_depends_on_past=ignore_depends_on_past,
             ignore_task_deps=ignore_task_deps,
             ignore_ti_state=ignore_ti_state,
             pool=pool,
             pickle_id=pickle_id,
-            cfg_path=cfg_path)
-        self.queue_command(
+            cfg_path=cfg_path,
+        )
+        self._queue_deferred_run(
             SimpleTaskInstance(task_instance),
-            command_list_to_run,
+            deferred_run,
             priority=task_instance.task.priority_weight_total,
             queue=task_instance.task.queue)
 
+    def queue_simple_task_instance(self, simple_task_instance: SimpleTaskInstance, simple_dag: SimpleDag):
+        """Queues simple task instance."""
+        priority = simple_task_instance.priority_weight
+        queue = simple_task_instance.queue
+
+        queue_task_run = LocalTaskJobDeferredRun(
+            dag_id=simple_task_instance.dag_id,
+            task_id=simple_task_instance.task_id,
+            execution_date=simple_task_instance.execution_date,
+            pool=simple_task_instance.pool,
+            subdir=simple_dag.full_filepath,
+            pickle_id=simple_dag.pickle_id
+        )
+
+        self._queue_deferred_run(
+            simple_task_instance,
+            queue_task_run,
+            priority=priority,
+            queue=queue
+        )
+
 
 Review comment:
   Task = instance of operator. This is a Python object only. 
   TaskInstance = Task instance at a specific time.  It's also database entity.
   
   I also think that SimpleTaskInstance should be deleted in a separate PR.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
nuclearpinguin commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r363744138
 
 

 ##########
 File path: airflow/models/queue_task_run.py
 ##########
 @@ -0,0 +1,206 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from airflow.utils import timezone
+
+
+class RawTaskDeferredRun:  # pylint: disable=too-many-instance-attributes
+    """
+    Generates the shell command required to execute this task instance.
+
+    :param dag_id: DAG ID
+    :type dag_id: unicode
+    :param task_id: Task ID
+    :type task_id: unicode
+    :param execution_date: Execution date for the task
+    :type execution_date: datetime.datetime
+    :param mark_success: Whether to mark the task as successful
+    :type mark_success: bool
+    :param pickle_id: If the DAG was serialized to the DB, the ID
+        associated with the pickled DAG
+    :type pickle_id: unicode
+    :param subdir: path to the file containing the DAG definition
+    :param job_id: job ID (needs more details)
+    :param pool: the Airflow pool that the task should run in
+    :type pool: unicode
+    :param cfg_path: the Path to the configuration file
+    :type cfg_path: str
+    :return: shell command that can be used to run the task instance
+    """
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        dag_id,
+        task_id,
+        execution_date,
+        mark_success=None,
+        pickle_id=None,
+        job_id=None,
+        force=None,
+        pool=None,
+        subdir=None,
+        cfg_path=None,
+        mock_command=None,
+    ):
+        self.dag_id = dag_id
+        self.task_id = task_id
+        if isinstance(execution_date, str):
+            self.execution_date = timezone.parse(execution_date)
+        else:
+            self.execution_date = execution_date
+        self.mark_success = mark_success
+        self.pickle_id = pickle_id
+        self.job_id = job_id
+        self.force = force
+        self.pool = pool
+        self.subdir = subdir
+        self.cfg_path = cfg_path
+        self.mock_command = mock_command
+
+    def as_command(self):
+        """Generate CLI command"""
+        if self.mock_command:
+            return self.mock_command
+        iso = self.execution_date.isoformat()
+        cmd = ["airflow", "tasks", "run", str(self.dag_id), str(self.task_id), str(iso), "--raw"]
+        if self.mark_success:
+            cmd.extend(["--mark_success"])
+        if self.pickle_id:
+            cmd.extend(["--pickle", str(self.pickle_id)])
+        if self.job_id:
+            cmd.extend(["--job_id", str(self.job_id)])
+        if self.force:
+            cmd.extend(["--force"])
+        if self.pool:
+            cmd.extend(["--pool", self.pool])
+        if self.subdir:
+            cmd.extend(["--subdir", self.subdir])
+        if self.cfg_path:
+            cmd.extend(["--cfg_path", self.cfg_path])
+        return cmd
+
+    def __repr__(self):
+        iso = self.execution_date.isoformat()
+        return f"RawTaskDeferredRun(dag_id={self.dag_id}, task_id={self.task_id}, execution_date={iso})"
+
+
+class LocalTaskJobDeferredRun:  # pylint: disable=too-many-instance-attributes
+    """
+    Generates the shell command required to execute this task instance.
 
 Review comment:
   I am not sure if this is the right description for the class, it does not explain the purpose of the class imho.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] dazza-codes commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
dazza-codes commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r363898046
 
 

 ##########
 File path: airflow/executors/base_executor.py
 ##########
 @@ -232,14 +255,14 @@ def get_event_buffer(self, dag_ids=None) -> Dict[TaskInstanceKeyType, Optional[s
 
     def execute_async(self,
                       key: TaskInstanceKeyType,
-                      command: CommandType,
+                      deferred_run: LocalTaskJobDeferredRun,
                       queue: Optional[str] = None,
 
 Review comment:
   nit: the number of concepts in the name for `LocalTaskJobDeferredRun` begs so many questions about what the hec that thing is and how the whole OOP design is evolving such that complex combinations of so many concepts is necessary in this class name.  e.g. why is it not simple `DeferredTask` or `LocalTask` or `QueuedTask` (and `Queued` implies `DeferredRun`) and what's the difference between `Task` and `Job` and why does this one class capture both concepts at the same time?  (Of course, happy to take this nit-pick offline where I might get educated about all the things that make this complex combination of concepts necessary.)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on issue #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#issuecomment-572493810
 
 
   >     * Backward compatibility. Do not want to cause more incompatibility than necessary. Do not want to cause more incompatibility than necessary.  The update path is now relatively short. See UPDATING.md
   
   This is not a big concern to me -- the number of people who have written their own executor that lives outside of Airflow could be counted on one or two hands, so I would be happy saying "The interface for Executor's has changed from taking a command to X - see base_executor.py for the the new interface".

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on issue #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#issuecomment-572494156
 
 
   > In the future I would like to completely get rid of the `as_command` method if no execuctor will use it anymore, but this probably won't happen because KubernetesExecutor will still require it.
   
   That says to me that it should live in side KubeExecutor class?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r364697817
 
 

 ##########
 File path: airflow/executors/base_executor.py
 ##########
 @@ -232,14 +255,14 @@ def get_event_buffer(self, dag_ids=None) -> Dict[TaskInstanceKeyType, Optional[s
 
     def execute_async(self,
                       key: TaskInstanceKeyType,
-                      command: CommandType,
+                      deferred_run: LocalTaskJobDeferredRun,
                       queue: Optional[str] = None,
 
 Review comment:
   @ashb We need to store startup configuration information. TaskInstance and SimpleTaskInstance do not store type information. These objects are created in a completely different place and I would not like to mix classes. I like the principle of one responsibility. 
   
   What do you think about "QueueTaskInstance"? It sounds strange, but it makes sense in my opinion.
   TaskInstance -> Database entity.
   SimpleTaskInstance -> Picklable representation of TaskInstance. This It is used for communication between DagProcecssorAgent and Scheduler. 
   QueueTaskInstance -> TaskInstancec identifier with run configuration. This is used for "communication" between Executor and LocalTaskJob.
   
   I think the RawTaskDeferredRun class can be combined with TaskRunner. We do not need to have a class, because we immediately replace the object with a command. Does it make sense to you?
   
   CC: @nuclearpinguin 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r364653216
 
 

 ##########
 File path: UPDATING.md
 ##########
 @@ -57,6 +57,44 @@ https://developers.google.com/style/inclusive-documentation
 
 -->
 
+### Introduction of LocalTaskJobDeferred in the Executor.
+
+The executor uses ``LocalTaskJobDeferredRun`` instead of a list of strings with the command to be executed.
+All methods and fields that previously used the `command` parameter now use `deferred_run`.
+If your executor only extends non-implemented methods from BaseExecutor, you only need to update
+the ``execute_async`` method.
+
+The code below
+```diff
 
 Review comment:
   ```suggestion
   ```python
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on issue #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#issuecomment-572562252
 
 
   @ashb  maybe in future. For now, most executors use CLI.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on issue #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#issuecomment-572484694
 
 
   > So... question... If we're going to go through all this trouble in the first place, why not just do away with the whole command-line based workflow overall? We're taking so much time to find cleaner command generators when I think we should consider new designs that are easier to debug.
   
   Because of a few reasons:
   * Some executors will still require CLI, e.g. KubernetesExecutor.
   * Backward compatibility. Do not want to cause more incompatibility than necessary. Do not want to cause more incompatibility than necessary.  The update path is now relatively short. See UPDATING.md
   * I want to limit the scope of this PR only to necessary changes. In particular, I wanted to focus on BaseExecutor. Other executors can then be optimized to take advantage of the new possibilities that this PR will give, but this must be analyzed for each executor separately because it can create side effects.
   
   In the future I would like to completely get rid of the `as_command` method if no execuctor will use it anymore, but this probably won't happen because KubernetesExecutor will still require it.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on issue #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#issuecomment-571307257
 
 
   Does anyone have an idea on how to change the order of commits in The "commit" tab? I have a different order in the console and a different order on GH.
   ```
   2229be226 [AIRFLOW-6334] Use QueueTaskRun instead command in executors
   ad0aafb40 Make queue_command private
   17db1b155 Remove queue_task_instance in DebugExecutor
   cf74768f0 Extract LocalTaskJobDeferredRun from QueueTaskRun class
   08e66e24f Use get_queue_task_run instead of command_as_list in BaseTaskRunner
   47d0341a5 Remove unused parameters in QueueTaskRun
   4d8658b91 Rename QueueTaskRun class to RawTaskDeferredRun
   2a28de87c Rename parameters and variables
   23ed55e2c Add UPDATE.md note
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r364721927
 
 

 ##########
 File path: airflow/executors/base_executor.py
 ##########
 @@ -232,14 +255,14 @@ def get_event_buffer(self, dag_ids=None) -> Dict[TaskInstanceKeyType, Optional[s
 
     def execute_async(self,
                       key: TaskInstanceKeyType,
-                      command: CommandType,
+                      deferred_run: LocalTaskJobDeferredRun,
                       queue: Optional[str] = None,
 
 Review comment:
   Oh and I think the class should live under airflow.executors instead of airflow.models -- it's not stored in the DB which _most_ of in that module are.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] dazza-codes commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
dazza-codes commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r363904383
 
 

 ##########
 File path: airflow/executors/celery_executor.py
 ##########
 @@ -22,15 +22,16 @@
 import time
 import traceback
 from multiprocessing import Pool, cpu_count
-from typing import Any, List, Optional, Tuple, Union
+from typing import Any, Dict, List, Optional, Tuple, Union
 
 from celery import Celery, Task, states as celery_states
 from celery.result import AsyncResult
 
 from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.executors.base_executor import BaseExecutor, CommandType
+from airflow.executors.base_executor import BaseExecutor
+from airflow.models.queue_task_run import LocalTaskJobDeferredRun
 from airflow.models.taskinstance import SimpleTaskInstance, TaskInstanceKeyType, TaskInstanceStateType
 
 Review comment:
   - if `queue_task_run` owns the new class, it implies a class name more like `QueuedTask` (and forget about `run` because that's implied anyway isn't it? And avoid the combination of `Task` with `Job` or use `QueuedJob` or `LocalQueuedJob` as a subclass of `QueuedJob`?)
   - `SimpleTaskInstance` is an instance of a task that could be recurring on a schedule and I wonder if the use of `Instance` and `Job` have some overlaps and how those terms related to scheduled tasks, task instances and task retries and whether a `job` is just one of the task-instance tries or something (best guess at how the complex concepts get chained together); which might lead to something like `SimpleTask` and `SimpleJob` as the simplest basic distinction, where `SimpleTask` captures all task attributes including schedule, N-tries etc and `SimpleJob` is a particular "task-try" and it could have variations depending on the execution model, like a `QueuedJob` (just trying to avoid combinations of all the things as in `LocalTaskJobDeferredRun`).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r364657264
 
 

 ##########
 File path: airflow/executors/base_executor.py
 ##########
 @@ -232,14 +255,14 @@ def get_event_buffer(self, dag_ids=None) -> Dict[TaskInstanceKeyType, Optional[s
 
     def execute_async(self,
                       key: TaskInstanceKeyType,
-                      command: CommandType,
+                      deferred_run: LocalTaskJobDeferredRun,
                       queue: Optional[str] = None,
 
 Review comment:
   @mik-laj Here's an idea: do we actually need a whole new class, or could we instead do something like `executor.execute_async(task_instance)`?
   
   Since the job of the executor is to run execute tasks, not run arbitrary commands, why not just pass it the entire TI object and let the executor work out what it needs to do. Thinking about it now passing down a command to `airflow run `/`airflow tasks run` instead of the TI itself feels like a very leaky abstraction.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on issue #7085: [AIRFLOW-6334] Use classes instead list of string in executors

Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #7085: [AIRFLOW-6334] Use classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#issuecomment-571310474
 
 
   Does anyone have an idea for a place for new classes? `airflow.executor` or `airflow.jobs`? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services