You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/15 13:09:35 UTC

[GitHub] [airflow] yuqian90 opened a new pull request #15382: Fix dag.clear() to set multiple dags to running when necessary

yuqian90 opened a new pull request #15382:
URL: https://github.com/apache/airflow/pull/15382


   closes: #14260
   related: #4776
   ---
   When clearing task across dags using `ExternalTaskMarker` the dag state of the external DagRun is not set to active. So cleared tasks in the external dag will not automatically start if the `DagRun` is a Failed or Succeeded state.
   #4776 tried to fix a similar issue for subdag. But it did not fix `ExternalTaskMarker`. This PR fixes both.
   
   Two changes are made to fix the issue:
   1. Make `clear_task_instances` set DagRuns' state to `dag_run_state` for all the affected DagRuns. 
   2. The filter for `DagRun` in `clear_task_instances` is fixed too. Previously, it made an assumption that execution_dates for all the dag_ids are the same, which is not always correct.
   
   `test_external_task_marker_clear_activate` is added to make sure the fix does the right thing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on pull request #15382: Fix dag.clear() to set multiple dags to running when necessary

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on pull request #15382:
URL: https://github.com/apache/airflow/pull/15382#issuecomment-849272260


   > What's the status of this? @yuqian90 are you just waiting for a final review?
   
   Yes. The PR has been tested and works well. Just waiting for reviews.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 merged pull request #15382: Fix dag.clear() to set multiple dags to running when necessary

Posted by GitBox <gi...@apache.org>.
yuqian90 merged pull request #15382:
URL: https://github.com/apache/airflow/pull/15382


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15382: Fix dag.clear() to set multiple dags to running when necessary

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15382:
URL: https://github.com/apache/airflow/pull/15382#issuecomment-849711728


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on a change in pull request #15382: Fix dag.clear() to set multiple dags to running when necessary

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #15382:
URL: https://github.com/apache/airflow/pull/15382#discussion_r614626032



##########
File path: airflow/models/taskinstance.py
##########
@@ -205,19 +206,25 @@ def clear_task_instances(
         for job in session.query(BaseJob).filter(BaseJob.id.in_(job_ids)).all():  # noqa
             job.state = State.SHUTDOWN
 
-    if activate_dag_runs and tis:
+    if (dag_run_state is not False) and tis:
         from airflow.models.dagrun import DagRun  # Avoid circular import
 
+        dates_by_dag_id = defaultdict(set)
+        for instance in tis:
+            dates_by_dag_id[instance.dag_id].add(instance.execution_date)
+
         drs = (
             session.query(DagRun)
             .filter(
-                DagRun.dag_id.in_({ti.dag_id for ti in tis}),
-                DagRun.execution_date.in_({ti.execution_date for ti in tis}),
+                or_(
+                    and_(DagRun.dag_id == dag_id, DagRun.execution_date.in_(dates))

Review comment:
       This filter is fixed. Previously it assumes execution_dates of all the DagRun are the same, which is not always true. The fix is to query by dag_id and execution_dates in a hierarchical manner.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15382: Fix dag.clear() to set multiple dags to running when necessary

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15382:
URL: https://github.com/apache/airflow/pull/15382#discussion_r616962489



##########
File path: airflow/api_connexion/endpoints/task_instance_endpoint.py
##########
@@ -251,18 +251,8 @@ def post_clear_task_instances(dag_id: str, session=None):
     task_instances = dag.clear(get_tis=True, **data)
     if not data["dry_run"]:
         clear_task_instances(
-            task_instances,
-            session,
-            dag=dag,
-            activate_dag_runs=False,  # We will set DagRun state later.
+            task_instances, session, dag=dag, dag_run_state=State.RUNNING if reset_dag_runs else None
         )
-        if reset_dag_runs:
-            dag.set_dag_runs_state(

Review comment:
       I’d say it’s better to remove it here; if the code is dead after a bug fix, that code is simply wrong and should not exist 🙂 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15382: Fix dag.clear() to set multiple dags to running when necessary

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15382:
URL: https://github.com/apache/airflow/pull/15382#discussion_r640528645



##########
File path: airflow/api_connexion/endpoints/task_instance_endpoint.py
##########
@@ -251,18 +251,8 @@ def post_clear_task_instances(dag_id: str, session=None):
     task_instances = dag.clear(get_tis=True, **data)
     if not data["dry_run"]:
         clear_task_instances(
-            task_instances,
-            session,
-            dag=dag,
-            activate_dag_runs=False,  # We will set DagRun state later.
+            task_instances, session, dag=dag, dag_run_state=State.RUNNING if reset_dag_runs else None
         )
-        if reset_dag_runs:
-            dag.set_dag_runs_state(

Review comment:
       I'd like to remove it, but this is potentially a breaking change 😢 (who knows what users might be doing.) so please add it back and make it issue a DeprecationWarning instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15382: Fix dag.clear() to set multiple dags to running when necessary

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15382:
URL: https://github.com/apache/airflow/pull/15382#discussion_r640719602



##########
File path: airflow/models/dag.py
##########
@@ -1126,6 +1126,11 @@ def set_dag_runs_state(
         end_date: Optional[datetime] = None,
         dag_ids: List[str] = None,
     ) -> None:
+        warnings.warn(
+            "This method is deprecated and will be removed in a future version.",
+            DeprecationWarning,
+            stacklevel=3,

Review comment:
       ```suggestion
               stacklevel=2,
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15382: Fix dag.clear() to set multiple dags to running when necessary

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15382:
URL: https://github.com/apache/airflow/pull/15382#discussion_r616963403



##########
File path: airflow/models/taskinstance.py
##########
@@ -205,19 +206,25 @@ def clear_task_instances(
         for job in session.query(BaseJob).filter(BaseJob.id.in_(job_ids)).all():  # noqa
             job.state = State.SHUTDOWN
 
-    if activate_dag_runs and tis:
+    if (dag_run_state is not False) and tis:

Review comment:
       ```suggestion
       if dag_run_state is not False and tis:
   ```
   
   I don’t think parantheses are needed here? If this feels ambiguous, maybe swap them?
   
   ```python
       if tis and dag_run_state is not False:
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #15382: Fix dag.clear() to set multiple dags to running when necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #15382:
URL: https://github.com/apache/airflow/pull/15382#issuecomment-850311903


   Yeah. Some teething problems with MSSQL and @aneesh-joseph created the #16134 that might help!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #15382: Fix dag.clear() to set multiple dags to running when necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #15382:
URL: https://github.com/apache/airflow/pull/15382#issuecomment-867744775


   > Do we have an idea of when this fix will be released and made available via PyPI?
   
   It's cherry-picked to 2.1 branch and we are just about to release 2.1.1 with it (we are in feature-freeze/final test mode for it). Expect final release in ~ 1 week if everything goes well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on a change in pull request #15382: Fix dag.clear() to set multiple dags to running when necessary

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #15382:
URL: https://github.com/apache/airflow/pull/15382#discussion_r614055232



##########
File path: airflow/api_connexion/endpoints/task_instance_endpoint.py
##########
@@ -251,18 +251,8 @@ def post_clear_task_instances(dag_id: str, session=None):
     task_instances = dag.clear(get_tis=True, **data)
     if not data["dry_run"]:
         clear_task_instances(
-            task_instances,
-            session,
-            dag=dag,
-            activate_dag_runs=False,  # We will set DagRun state later.
+            task_instances, session, dag=dag, dag_run_state=State.RUNNING if reset_dag_runs else None
         )
-        if reset_dag_runs:
-            dag.set_dag_runs_state(

Review comment:
       `dag.set_dag_runs_state()` is no longer used after this PR. I feel we should remove it. But I'm not sure if that's a good thing to do in a bug fix. So I didn't do that here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jhtimmins commented on pull request #15382: Fix dag.clear() to set multiple dags to running when necessary

Posted by GitBox <gi...@apache.org>.
jhtimmins commented on pull request #15382:
URL: https://github.com/apache/airflow/pull/15382#issuecomment-849271227


   What's the status of this? @yuqian90 are you just waiting for a final review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #15382: Fix dag.clear() to set multiple dags to running when necessary

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #15382:
URL: https://github.com/apache/airflow/pull/15382#issuecomment-850265819


   Two random failures on MSSQL2019 only (where the DB didn't create the user -- /cc @potiuk @aneesh-joseph )
   
   Merging this PR anyway


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on pull request #15382: Fix dag.clear() to set multiple dags to running when necessary

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on pull request #15382:
URL: https://github.com/apache/airflow/pull/15382#issuecomment-849668674


   > Please add back the removed fn (😭) then LGTM.
   
   Thanks. I put it back.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] skabbass1 commented on pull request #15382: Fix dag.clear() to set multiple dags to running when necessary

Posted by GitBox <gi...@apache.org>.
skabbass1 commented on pull request #15382:
URL: https://github.com/apache/airflow/pull/15382#issuecomment-867722411


   Do we have an idea of when this fix will be released and made available via PyPI?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on a change in pull request #15382: Fix dag.clear() to set multiple dags to running when necessary

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #15382:
URL: https://github.com/apache/airflow/pull/15382#discussion_r640663521



##########
File path: airflow/api_connexion/endpoints/task_instance_endpoint.py
##########
@@ -251,18 +251,8 @@ def post_clear_task_instances(dag_id: str, session=None):
     task_instances = dag.clear(get_tis=True, **data)
     if not data["dry_run"]:
         clear_task_instances(
-            task_instances,
-            session,
-            dag=dag,
-            activate_dag_runs=False,  # We will set DagRun state later.
+            task_instances, session, dag=dag, dag_run_state=State.RUNNING if reset_dag_runs else None
         )
-        if reset_dag_runs:
-            dag.set_dag_runs_state(

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk closed pull request #15382: Fix dag.clear() to set multiple dags to running when necessary

Posted by GitBox <gi...@apache.org>.
potiuk closed pull request #15382:
URL: https://github.com/apache/airflow/pull/15382


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #15382: Fix dag.clear() to set multiple dags to running when necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #15382:
URL: https://github.com/apache/airflow/pull/15382#issuecomment-847054840


   Closing/reopening to trigger build again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org