You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/12/07 11:36:49 UTC

[GitHub] [airflow] ashb commented on a change in pull request #18724: Use `DagRun.run_id` instead of `execution_date` when updating state of TIs(UI & REST API)

ashb commented on a change in pull request #18724:
URL: https://github.com/apache/airflow/pull/18724#discussion_r754300938



##########
File path: airflow/api_connexion/endpoints/task_instance_endpoint.py
##########
@@ -297,15 +296,32 @@ def post_set_task_instances_state(dag_id, session):
         error_message = f"Task ID {task_id} not found"
         raise NotFound(error_message)
 
-    execution_date = data['execution_date']
-    try:
-        session.query(TI).filter_by(execution_date=execution_date, task_id=task_id, dag_id=dag_id).one()
-    except NoResultFound:
-        raise NotFound(f"Task instance not found for task {task_id} on execution_date {execution_date}")
+    execution_date = data.get('execution_date')
+    run_id = data.get('dag_run_id')
+
+    if run_id and not execution_date:
+        ti = (
+            session.query(func.count(TI.dag_id))
+            .filter(TI.task_id == task_id, TI.dag_id == dag_id, TI.run_id == run_id)
+            .scalar()
+        )
+        if not ti:
+            raise NotFound(detail=f"Task instance not found for task {task_id} on DAG run with ID {run_id}")
+    if execution_date and not run_id:
+        ti = (
+            session.query(func.count(TI.dag_id))
+            .filter(TI.execution_date == execution_date, TI.task_id == task_id, TI.dag_id == dag_id)
+            .scalar()
+        )
+        if not ti:
+            raise NotFound(
+                detail=f"Task instance not found for task {task_id} on execution_date {execution_date}"

Review comment:
       ```suggestion
                   detail=f"Task instance not found for task {task_id!r} on execution_date {execution_date!r}"
   ```

##########
File path: airflow/api_connexion/schemas/task_instance_schema.py
##########
@@ -129,13 +129,22 @@ class SetTaskInstanceStateFormSchema(Schema):
 
     dry_run = fields.Boolean(dump_default=True)
     task_id = fields.Str(required=True)
-    execution_date = fields.DateTime(required=True, validate=validate_istimezone)
+    execution_date = fields.DateTime(validate=validate_istimezone)
+    dag_run_id = fields.Str()

Review comment:
       Oh, this should probably just be called `run_id` -- that is what the column is anyway, and what we use in most/all of the function arguments.

##########
File path: airflow/api/common/experimental/mark_tasks.py
##########
@@ -33,6 +34,7 @@
 from airflow.utils.types import DagRunType
 
 
+@deprecated(reason="Use airflow.api.common.mark_tasks._create_dagruns instead", version="2.2.3")
 def _create_dagruns(dag, execution_dates, state, run_type):

Review comment:
       This is a private function (that's the convention in Python for a leading underscore) so we shouldn't have to deprecate this one, we can just delete it and update what ever uses it to use the new version.

##########
File path: airflow/api/common/experimental/mark_tasks.py
##########
@@ -60,6 +62,7 @@ def _create_dagruns(dag, execution_dates, state, run_type):
     return dag_runs
 
 
+@deprecated(reason="Use airflow.api.common.mark_tasks.set_state instead", version="2.2.3")

Review comment:
       If the behaviour of these functions are the same then we should do something like this instead for the whole file:
   
   
   ```
   import warnings
   
   from airflow.api.common.mark_tasks import *  # noqa
   
   warnings.warn(
       "This module is deprecated. Please use `airflow.api.common.mark_tasks` instead.", DeprecationWarning, stacklevel=2
   )
   
   ```

##########
File path: airflow/api_connexion/endpoints/task_instance_endpoint.py
##########
@@ -297,15 +296,32 @@ def post_set_task_instances_state(dag_id, session):
         error_message = f"Task ID {task_id} not found"
         raise NotFound(error_message)
 
-    execution_date = data['execution_date']
-    try:
-        session.query(TI).filter_by(execution_date=execution_date, task_id=task_id, dag_id=dag_id).one()
-    except NoResultFound:
-        raise NotFound(f"Task instance not found for task {task_id} on execution_date {execution_date}")
+    execution_date = data.get('execution_date')
+    run_id = data.get('dag_run_id')
+
+    if run_id and not execution_date:
+        ti = (
+            session.query(func.count(TI.dag_id))
+            .filter(TI.task_id == task_id, TI.dag_id == dag_id, TI.run_id == run_id)
+            .scalar()
+        )
+        if not ti:
+            raise NotFound(detail=f"Task instance not found for task {task_id} on DAG run with ID {run_id}")

Review comment:
       ```suggestion
               raise NotFound(detail=f"Task instance not found for task {task_id!r} on DAG run with ID {run_id!r}")
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org