You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/05/24 20:06:27 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

ephraimbuddy opened a new pull request #16030:
URL: https://github.com/apache/airflow/pull/16030


   This change removes execution_date from tasks commands and uses DagRun.run_id instead
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#discussion_r643381620



##########
File path: tests/cli/commands/test_task_command.py
##########
@@ -62,6 +62,7 @@ class TestCliTasks(unittest.TestCase):
     def setUpClass(cls):
         cls.dagbag = DagBag(include_examples=True)
         cls.parser = cli_parser.get_parser()
+        clear_db_runs()

Review comment:
       We should also call this on `tearDown()` to keep things tidy for other tests.

##########
File path: airflow/cli/commands/task_command.py
##########
@@ -42,9 +45,26 @@
     get_dags,
     suppress_logs_and_warning,
 )
+from airflow.utils.dates import timezone
 from airflow.utils.log.logging_mixin import StreamLogWriter
 from airflow.utils.net import get_hostname
-from airflow.utils.session import create_session
+from airflow.utils.session import create_session, provide_session
+
+
+def _get_ti(dag, task, exec_date_or_run_id):
+    """Get the task instance through DagRun.run_id, if that fails, get the TI the old way"""
+    dag_run = dag.get_dagrun(run_id=exec_date_or_run_id)
+    if not dag_run:
+        try:
+            execution_date = timezone.parse(exec_date_or_run_id)
+            # print("Warning: execution_date will be removed in
+            # tasks command in the future. Please use DagRun.run_id")

Review comment:
       This this a TODO for the future?

##########
File path: airflow/cli/commands/task_command.py
##########
@@ -42,9 +45,26 @@
     get_dags,
     suppress_logs_and_warning,
 )
+from airflow.utils.dates import timezone
 from airflow.utils.log.logging_mixin import StreamLogWriter
 from airflow.utils.net import get_hostname
-from airflow.utils.session import create_session
+from airflow.utils.session import create_session, provide_session
+
+
+def _get_ti(dag, task, exec_date_or_run_id):
+    """Get the task instance through DagRun.run_id, if that fails, get the TI the old way"""
+    dag_run = dag.get_dagrun(run_id=exec_date_or_run_id)
+    if not dag_run:
+        try:
+            execution_date = timezone.parse(exec_date_or_run_id)
+            # print("Warning: execution_date will be removed in
+            # tasks command in the future. Please use DagRun.run_id")
+            ti = TaskInstance(task, execution_date)
+            return ti
+        except (ParserError, TypeError):
+            raise AirflowException(f"DagRun with run_id: {exec_date_or_run_id} not found")
+    ti = TaskInstance(task, execution_date=dag_run.execution_date)
+    return ti

Review comment:
       This one tries to use the argument as run_id first and fallback to parsing it as an execution date otherwise, but…

##########
File path: airflow/models/dag.py
##########
@@ -905,22 +905,24 @@ def get_num_active_runs(self, external_trigger=None, session=None):
         return query.scalar()
 
     @provide_session
-    def get_dagrun(self, execution_date, session=None):
+    def get_dagrun(self, execution_date: str = None, run_id: str = None, session=None):

Review comment:
       If the argument could be None, the type should be `Optional[str]`. (In this particular case, we can actually set the default to `""` since both the execution date and run_id cannot be empty, but that might not be as intuitive to read.)

##########
File path: airflow/cli/commands/task_command.py
##########
@@ -321,38 +340,39 @@ def _guess_debugger():
 
 @cli_utils.action_logging
 @suppress_logs_and_warning
-def task_states_for_dag_run(args):
+@provide_session
+def task_states_for_dag_run(args, session=None):
     """Get the status of all task instances in a DagRun"""
-    with create_session() as session:
-        tis = (
-            session.query(
-                TaskInstance.dag_id,
-                TaskInstance.execution_date,
-                TaskInstance.task_id,
-                TaskInstance.state,
-                TaskInstance.start_date,
-                TaskInstance.end_date,
-            )
-            .filter(TaskInstance.dag_id == args.dag_id, TaskInstance.execution_date == args.execution_date)
-            .all()
+    try:
+        execution_date = timezone.parse(args.execution_date_or_run_id)
+        dag_run = (
+            session.query(DagRun)
+            .filter(DagRun.execution_date == execution_date, DagRun.dag_id == args.dag_id)
+            .one_or_none()
         )
-
-        if len(tis) == 0:
-            raise AirflowException("DagRun does not exist.")
-
-        AirflowConsole().print_as(
-            data=tis,
-            output=args.output,
-            mapper=lambda ti: {
-                "dag_id": ti.dag_id,
-                "execution_date": ti.execution_date.isoformat(),
-                "task_id": ti.task_id,
-                "state": ti.state,
-                "start_date": ti.start_date.isoformat() if ti.start_date else "",
-                "end_date": ti.end_date.isoformat() if ti.end_date else "",
-            },
+    except (ParserError, TypeError):
+        dag_run = (
+            session.query(DagRun)
+            .filter(DagRun.run_id == args.execution_date_or_run_id, DagRun.dag_id == args.dag_id)
+            .one_or_none()

Review comment:
       Logic here is the other way around, trying to parse the argument as an execution date first and look it up and a run_id later. This feels wrong to me; we should commit to do this one way or the other, and use a common function for it. There is also potentially an edge case if a run_id somehow can be parsed as a datetiem string (admittedly extremely unlikely), which would cause pretty terrible internal inconsistencies.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#issuecomment-847328010


   > It looks like a breaking change. Am I right?
   
   I can't really tell right now. From what I understand, the execution date sent to the tasks command is not really used in the context of execution date but to just get TaskInstance. I will shed more light as I work on this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#issuecomment-858677730


   @ephraimbuddy Looks like we've got some test failures here to look at 😢 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#discussion_r643379007



##########
File path: airflow/cli/commands/task_command.py
##########
@@ -42,9 +45,26 @@
     get_dags,
     suppress_logs_and_warning,
 )
+from airflow.utils.dates import timezone
 from airflow.utils.log.logging_mixin import StreamLogWriter
 from airflow.utils.net import get_hostname
-from airflow.utils.session import create_session
+from airflow.utils.session import create_session, provide_session
+
+
+def _get_ti(dag, task, exec_date_or_run_id):
+    """Get the task instance through DagRun.run_id, if that fails, get the TI the old way"""
+    dag_run = dag.get_dagrun(run_id=exec_date_or_run_id)
+    if not dag_run:
+        try:
+            execution_date = timezone.parse(exec_date_or_run_id)
+            # print("Warning: execution_date will be removed in
+            # tasks command in the future. Please use DagRun.run_id")

Review comment:
       Is this a TODO for the future?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#discussion_r643565053



##########
File path: airflow/cli/commands/task_command.py
##########
@@ -42,9 +45,26 @@
     get_dags,
     suppress_logs_and_warning,
 )
+from airflow.utils.dates import timezone
 from airflow.utils.log.logging_mixin import StreamLogWriter
 from airflow.utils.net import get_hostname
-from airflow.utils.session import create_session
+from airflow.utils.session import create_session, provide_session
+
+
+def _get_ti(dag, task, exec_date_or_run_id):

Review comment:
       It's actually the task itself




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#discussion_r643547588



##########
File path: airflow/cli/commands/task_command.py
##########
@@ -42,9 +45,26 @@
     get_dags,
     suppress_logs_and_warning,
 )
+from airflow.utils.dates import timezone
 from airflow.utils.log.logging_mixin import StreamLogWriter
 from airflow.utils.net import get_hostname
-from airflow.utils.session import create_session
+from airflow.utils.session import create_session, provide_session
+
+
+def _get_ti(dag, task, exec_date_or_run_id):
+    """Get the task instance through DagRun.run_id, if that fails, get the TI the old way"""
+    dag_run = dag.get_dagrun(run_id=exec_date_or_run_id)
+    if not dag_run:
+        try:
+            execution_date = timezone.parse(exec_date_or_run_id)
+            # print("Warning: execution_date will be removed in
+            # tasks command in the future. Please use DagRun.run_id")

Review comment:
       I'm not sure it's OK to print it right now. @ash




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy edited a comment on pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
ephraimbuddy edited a comment on pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#issuecomment-847328010


   > It looks like a breaking change. Am I right?
   
   I can't really tell right now. From what I understand, the execution date sent to the tasks command is not really used in the context of execution date but to just get TaskInstance. I will shed more light as I work on this.
   
   Update:
   I have allowed backward compatibility  @mik-laj 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb edited a comment on pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
ashb edited a comment on pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#issuecomment-858677730


   @ephraimbuddy Looks like we've got some test failures here to look at 😢 
   
   ```
     [2021-06-10 00:49:47,288] {local_task_job.py:197} WARNING - State of this instance has been externally set to None. Terminating instance.
     Running <TaskInstance: test_mark_failure.test_on_failure 2016-01-01T00:00:00+00:00 [failed]> on host d86bc15dcf39
     [2021-06-10 00:49:48,376] {local_task_job.py:197} WARNING - State of this instance has been externally set to failed. Terminating instance.
   ```
   
   Hmmm!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#discussion_r646595504



##########
File path: airflow/cli/commands/task_command.py
##########
@@ -42,9 +45,27 @@
     get_dags,
     suppress_logs_and_warning,
 )
+from airflow.utils.dates import timezone
 from airflow.utils.log.logging_mixin import StreamLogWriter
 from airflow.utils.net import get_hostname
-from airflow.utils.session import create_session
+from airflow.utils.session import create_session, provide_session
+
+
+def _get_ti(task, exec_date_or_run_id):
+    """Get the task instance through DagRun.run_id, if that fails, get the TI the old way"""
+    dag_run = task.dag.get_dagrun(run_id=exec_date_or_run_id)
+    if not dag_run:
+        try:
+            execution_date = timezone.parse(exec_date_or_run_id)
+            ti = TaskInstance(task, execution_date)
+            ti.refresh_from_db()
+            return ti
+        except (ParserError, TypeError):
+            raise AirflowException(f"DagRun with run_id: {exec_date_or_run_id} not found")
+    ti = dag_run.get_task_instance(task.task_id)
+    ti.task = task
+    ti.refresh_from_db()

Review comment:
       You are right. We only need it in L60




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#discussion_r646565230



##########
File path: airflow/cli/commands/task_command.py
##########
@@ -24,6 +24,8 @@
 from contextlib import contextmanager, redirect_stderr, redirect_stdout
 from typing import List
 
+from pendulum.parsing import ParserError

Review comment:
       ```suggestion
   from pendulum.parsing.exceptions import ParserError
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #16030:
URL: https://github.com/apache/airflow/pull/16030


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #16030:
URL: https://github.com/apache/airflow/pull/16030


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#discussion_r645969935



##########
File path: airflow/cli/commands/task_command.py
##########
@@ -42,9 +45,26 @@
     get_dags,
     suppress_logs_and_warning,
 )
+from airflow.utils.dates import timezone
 from airflow.utils.log.logging_mixin import StreamLogWriter
 from airflow.utils.net import get_hostname
-from airflow.utils.session import create_session
+from airflow.utils.session import create_session, provide_session
+
+
+def _get_ti(dag, task, exec_date_or_run_id):
+    """Get the task instance through DagRun.run_id, if that fails, get the TI the old way"""
+    dag_run = dag.get_dagrun(run_id=exec_date_or_run_id)
+    if not dag_run:
+        try:
+            execution_date = timezone.parse(exec_date_or_run_id)
+            # print("Warning: execution_date will be removed in
+            # tasks command in the future. Please use DagRun.run_id")
+            ti = TaskInstance(task, execution_date)
+            return ti
+        except (ParserError, TypeError):
+            raise AirflowException(f"DagRun with run_id: {exec_date_or_run_id} not found")
+    ti = TaskInstance(task, execution_date=dag_run.execution_date)

Review comment:
       ```suggestion
       ti = dag_run.get_task_instance(task.task_id)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#discussion_r643388171



##########
File path: airflow/cli/commands/task_command.py
##########
@@ -321,38 +340,39 @@ def _guess_debugger():
 
 @cli_utils.action_logging
 @suppress_logs_and_warning
-def task_states_for_dag_run(args):
+@provide_session
+def task_states_for_dag_run(args, session=None):
     """Get the status of all task instances in a DagRun"""
-    with create_session() as session:
-        tis = (
-            session.query(
-                TaskInstance.dag_id,
-                TaskInstance.execution_date,
-                TaskInstance.task_id,
-                TaskInstance.state,
-                TaskInstance.start_date,
-                TaskInstance.end_date,
-            )
-            .filter(TaskInstance.dag_id == args.dag_id, TaskInstance.execution_date == args.execution_date)
-            .all()
+    try:
+        execution_date = timezone.parse(args.execution_date_or_run_id)
+        dag_run = (
+            session.query(DagRun)
+            .filter(DagRun.execution_date == execution_date, DagRun.dag_id == args.dag_id)
+            .one_or_none()
         )
-
-        if len(tis) == 0:
-            raise AirflowException("DagRun does not exist.")
-
-        AirflowConsole().print_as(
-            data=tis,
-            output=args.output,
-            mapper=lambda ti: {
-                "dag_id": ti.dag_id,
-                "execution_date": ti.execution_date.isoformat(),
-                "task_id": ti.task_id,
-                "state": ti.state,
-                "start_date": ti.start_date.isoformat() if ti.start_date else "",
-                "end_date": ti.end_date.isoformat() if ti.end_date else "",
-            },
+    except (ParserError, TypeError):
+        dag_run = (
+            session.query(DagRun)
+            .filter(DagRun.run_id == args.execution_date_or_run_id, DagRun.dag_id == args.dag_id)
+            .one_or_none()

Review comment:
       So long as the order is consistent (and the "right" way going forward should be run_id first, then fallback) then the lookup order is at least consistent.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#discussion_r641842538



##########
File path: airflow/models/dag.py
##########
@@ -905,22 +905,24 @@ def get_num_active_runs(self, external_trigger=None, session=None):
         return query.scalar()
 
     @provide_session
-    def get_dagrun(self, execution_date, session=None):
+    def get_dagrun(self, execution_date: str = None, run_id: str = None, session=None):
         """
-        Returns the dag run for a given execution date if it exists, otherwise
+        Returns the dag run for a given execution date or run_id if it exists, otherwise
         none.
 
         :param execution_date: The execution date of the DagRun to find.
+        :param run_id: The run_id of the DagRun to find.
         :param session:
         :return: The DagRun if found, otherwise None.
         """
-        dagrun = (
-            session.query(DagRun)
-            .filter(DagRun.dag_id == self.dag_id, DagRun.execution_date == execution_date)
-            .first()
-        )
-
-        return dagrun
+        if not (execution_date or run_id):
+            raise AirflowException("You must provide either the execution_date or the run_id")

Review comment:
       There is a built in error that has this meaning, so we should raise that.
   
   ```suggestion
               raise TypeError("You must provide either the execution_date or the run_id")
   ```

##########
File path: airflow/cli/commands/task_command.py
##########
@@ -42,9 +45,26 @@
     get_dags,
     suppress_logs_and_warning,
 )
+from airflow.utils.dates import timezone
 from airflow.utils.log.logging_mixin import StreamLogWriter
 from airflow.utils.net import get_hostname
-from airflow.utils.session import create_session
+from airflow.utils.session import create_session, provide_session
+
+
+def _get_ti(dag, task, exec_date_or_run_id):

Review comment:
       ```suggestion
   def _get_ti(dag, task_id, exec_date_or_run_id):
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#discussion_r646403110



##########
File path: airflow/cli/commands/task_command.py
##########
@@ -42,9 +45,29 @@
     get_dags,
     suppress_logs_and_warning,
 )
+from airflow.utils.dates import timezone
 from airflow.utils.log.logging_mixin import StreamLogWriter
 from airflow.utils.net import get_hostname
-from airflow.utils.session import create_session
+from airflow.utils.session import create_session, provide_session
+
+
+def _get_ti(dag, task, exec_date_or_run_id):
+    """Get the task instance through DagRun.run_id, if that fails, get the TI the old way"""
+    dag_run = dag.get_dagrun(run_id=exec_date_or_run_id)
+    if not dag_run:
+        try:
+            execution_date = timezone.parse(exec_date_or_run_id)
+            # print("Warning: execution_date will be removed in
+            # tasks command in the future. Please use DagRun.run_id")

Review comment:
       Once we have corrected Airflow itself to use the new version then this shoudl be printing something (i.e. add this in the "next" PR)
   ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#issuecomment-855832691


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#discussion_r646431178



##########
File path: airflow/cli/commands/task_command.py
##########
@@ -42,9 +45,29 @@
     get_dags,
     suppress_logs_and_warning,
 )
+from airflow.utils.dates import timezone
 from airflow.utils.log.logging_mixin import StreamLogWriter
 from airflow.utils.net import get_hostname
-from airflow.utils.session import create_session
+from airflow.utils.session import create_session, provide_session
+
+
+def _get_ti(dag, task, exec_date_or_run_id):
+    """Get the task instance through DagRun.run_id, if that fails, get the TI the old way"""
+    dag_run = dag.get_dagrun(run_id=exec_date_or_run_id)
+    if not dag_run:
+        try:
+            execution_date = timezone.parse(exec_date_or_run_id)
+            # print("Warning: execution_date will be removed in
+            # tasks command in the future. Please use DagRun.run_id")
+            ti = TaskInstance(task, execution_date)

Review comment:
       Yes. Having it refreshed here is best!

##########
File path: airflow/cli/commands/task_command.py
##########
@@ -42,9 +45,29 @@
     get_dags,
     suppress_logs_and_warning,
 )
+from airflow.utils.dates import timezone
 from airflow.utils.log.logging_mixin import StreamLogWriter
 from airflow.utils.net import get_hostname
-from airflow.utils.session import create_session
+from airflow.utils.session import create_session, provide_session
+
+
+def _get_ti(dag, task, exec_date_or_run_id):
+    """Get the task instance through DagRun.run_id, if that fails, get the TI the old way"""
+    dag_run = dag.get_dagrun(run_id=exec_date_or_run_id)
+    if not dag_run:
+        try:
+            execution_date = timezone.parse(exec_date_or_run_id)
+            # print("Warning: execution_date will be removed in
+            # tasks command in the future. Please use DagRun.run_id")
+            ti = TaskInstance(task, execution_date)

Review comment:
       Yes. Having it refreshed here is the best!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#discussion_r646568767



##########
File path: airflow/cli/commands/task_command.py
##########
@@ -42,9 +45,27 @@
     get_dags,
     suppress_logs_and_warning,
 )
+from airflow.utils.dates import timezone
 from airflow.utils.log.logging_mixin import StreamLogWriter
 from airflow.utils.net import get_hostname
-from airflow.utils.session import create_session
+from airflow.utils.session import create_session, provide_session
+
+
+def _get_ti(task, exec_date_or_run_id):
+    """Get the task instance through DagRun.run_id, if that fails, get the TI the old way"""
+    dag_run = task.dag.get_dagrun(run_id=exec_date_or_run_id)
+    if not dag_run:
+        try:
+            execution_date = timezone.parse(exec_date_or_run_id)
+            ti = TaskInstance(task, execution_date)
+            ti.refresh_from_db()
+            return ti
+        except (ParserError, TypeError):
+            raise AirflowException(f"DagRun with run_id: {exec_date_or_run_id} not found")
+    ti = dag_run.get_task_instance(task.task_id)
+    ti.task = task
+    ti.refresh_from_db()

Review comment:
       Do we need this call ? Since L65 gets TI from DB itself, this feels redundant unless I am missing something




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#issuecomment-847312093


   It looks like a breaking change. Am I right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#issuecomment-859347413






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#issuecomment-858688497


   > @ephraimbuddy Looks like we've got some test failures here to look at 😢
   > 
   > ```
   >   [2021-06-10 00:49:47,288] {local_task_job.py:197} WARNING - State of this instance has been externally set to None. Terminating instance.
   >   Running <TaskInstance: test_mark_failure.test_on_failure 2016-01-01T00:00:00+00:00 [failed]> on host d86bc15dcf39
   >   [2021-06-10 00:49:48,376] {local_task_job.py:197} WARNING - State of this instance has been externally set to failed. Terminating instance.
   > ```
   > 
   > Hmmm!
   
   I think it’s related to sigterm. I want to get the other PR on sigterm merged. Working on the reviews


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#discussion_r643922302



##########
File path: tests/cli/commands/test_task_command.py
##########
@@ -62,6 +62,11 @@ class TestCliTasks(unittest.TestCase):
     def setUpClass(cls):
         cls.dagbag = DagBag(include_examples=True)
         cls.parser = cli_parser.get_parser()
+        clear_db_runs()
+
+    @classmethod
+    def tearDownClass(cls) -> None:
+        clear_db_runs()

Review comment:
       I think this should be `tearDown` not `tearDownClass` because we don’t want the DAG runs to carry over to other tests in the same class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#discussion_r646404666



##########
File path: airflow/cli/commands/task_command.py
##########
@@ -42,9 +45,29 @@
     get_dags,
     suppress_logs_and_warning,
 )
+from airflow.utils.dates import timezone
 from airflow.utils.log.logging_mixin import StreamLogWriter
 from airflow.utils.net import get_hostname
-from airflow.utils.session import create_session
+from airflow.utils.session import create_session, provide_session
+
+
+def _get_ti(dag, task, exec_date_or_run_id):
+    """Get the task instance through DagRun.run_id, if that fails, get the TI the old way"""
+    dag_run = dag.get_dagrun(run_id=exec_date_or_run_id)
+    if not dag_run:
+        try:
+            execution_date = timezone.parse(exec_date_or_run_id)
+            # print("Warning: execution_date will be removed in
+            # tasks command in the future. Please use DagRun.run_id")
+            ti = TaskInstance(task, execution_date)

Review comment:
       I was wondering if it makes sense to add `ti.refresh_from_db()` in to this path, and I think probably yes, even though only one of the tasks _needs_ it, this makes it predictable I think.
   
   We can then remove the call on L250.
   
   Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#issuecomment-847312117


   [The Workflow run](https://github.com/apache/airflow/actions/runs/872553240) is cancelling this PR. Building images for the PR has failed. Follow the workflow link to check the reason.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #16030:
URL: https://github.com/apache/airflow/pull/16030


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#discussion_r646404991



##########
File path: airflow/cli/commands/task_command.py
##########
@@ -42,9 +45,29 @@
     get_dags,
     suppress_logs_and_warning,
 )
+from airflow.utils.dates import timezone
 from airflow.utils.log.logging_mixin import StreamLogWriter
 from airflow.utils.net import get_hostname
-from airflow.utils.session import create_session
+from airflow.utils.session import create_session, provide_session
+
+
+def _get_ti(dag, task, exec_date_or_run_id):
+    """Get the task instance through DagRun.run_id, if that fails, get the TI the old way"""
+    dag_run = dag.get_dagrun(run_id=exec_date_or_run_id)
+    if not dag_run:
+        try:
+            execution_date = timezone.parse(exec_date_or_run_id)
+            # print("Warning: execution_date will be removed in
+            # tasks command in the future. Please use DagRun.run_id")
+            ti = TaskInstance(task, execution_date)
+            return ti
+        except (ParserError, TypeError):
+            raise AirflowException(f"DagRun with run_id: {exec_date_or_run_id} not found")
+    ti = dag_run.get_task_instance(task.task_id)
+    ti.task = task
+    ti.run_as_user = task.run_as_user
+    ti.execution_date = dag_run.execution_date

Review comment:
       _this_ line doesn't feel like it should be needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#discussion_r646425071



##########
File path: airflow/cli/commands/task_command.py
##########
@@ -42,9 +45,27 @@
     get_dags,
     suppress_logs_and_warning,
 )
+from airflow.utils.dates import timezone
 from airflow.utils.log.logging_mixin import StreamLogWriter
 from airflow.utils.net import get_hostname
-from airflow.utils.session import create_session
+from airflow.utils.session import create_session, provide_session
+
+
+def _get_ti(dag, task, exec_date_or_run_id):

Review comment:
       Or alrternatively, do we need to pass `task` here? Passing both seems unnecessary




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16030: Update `airflow tasks *` commands to lookup TaskInstances from DagRun Table

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#discussion_r646424587



##########
File path: airflow/cli/commands/task_command.py
##########
@@ -42,9 +45,27 @@
     get_dags,
     suppress_logs_and_warning,
 )
+from airflow.utils.dates import timezone
 from airflow.utils.log.logging_mixin import StreamLogWriter
 from airflow.utils.net import get_hostname
-from airflow.utils.session import create_session
+from airflow.utils.session import create_session, provide_session
+
+
+def _get_ti(dag, task, exec_date_or_run_id):

Review comment:
       Do we need to pass `dag` here -- isn't that available as `task.dag`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org