You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/12/04 18:04:31 UTC

[GitHub] [airflow] XD-DENG opened a new pull request #12815: Cleanup & improvements around scheduling

XD-DENG opened a new pull request #12815:
URL: https://github.com/apache/airflow/pull/12815


   ### 1. Cleanup
   Mainly to clean up stable docstring.
   - Remove unneeded code line
   - Remove stale docstring
   - Fix wrong docstring
   - Fix stale doc image link in docstring
   
   ### 2. Improvements
   - Avoid unnecessary loop in `DagRun.schedule_tis()`, which is invoked inside `SchedulerJob`
   - Minor improvement on `DAG.deactivate_stale_dags()`, which is invoked inside `SchedulerJob`
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #12815: Cleanup & improvements around scheduling

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #12815:
URL: https://github.com/apache/airflow/pull/12815#discussion_r536305168



##########
File path: airflow/models/dagrun.py
##########
@@ -718,19 +718,20 @@ def schedule_tis(self, schedulable_tis: Iterable[TI], session: Session = None) -
         All the TIs should belong to this DagRun, but this code is in the hot-path, this is not checked -- it
         is the caller's responsibility to call this function only with TIs from a single dag run.
         """
-        # Get list of TIs that do not need to executed, these are
+        # Get list of TI IDs that do not need to executed, these are
         # tasks using DummyOperator and without on_execute_callback / on_success_callback
-        dummy_tis = {
-            ti
-            for ti in schedulable_tis
+        dummy_ti_ids = []
+        schedulable_ti_ids = []
+        for ti in schedulable_tis:
             if (
                 ti.task.inherits_from_dummy_operator
                 and not ti.task.on_execute_callback
                 and not ti.task.on_success_callback
-            )
-        }
+            ):
+                dummy_ti_ids.append(ti.task_id)
+            else:
+                schedulable_ti_ids.append(ti.task_id)
 
-        schedulable_ti_ids = [ti.task_id for ti in schedulable_tis if ti not in dummy_tis]
         count = 0

Review comment:
       Personally I don't find it necessary (for now).
   
   On the other hand, if we abstract this into a separate function, at least to what I can see, it's sort of "duplicated" with `helpers.partition()` (I don't want to use `helpers.partition()` here because it still traverse the iterable twice.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG merged pull request #12815: Cleanup & improvements around scheduling

Posted by GitBox <gi...@apache.org>.
XD-DENG merged pull request #12815:
URL: https://github.com/apache/airflow/pull/12815


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #12815: Cleanup & improvements around scheduling

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #12815:
URL: https://github.com/apache/airflow/pull/12815#discussion_r536287662



##########
File path: airflow/models/dag.py
##########
@@ -1966,7 +1966,7 @@ def deactivate_stale_dags(expiration_date, session=None):
             )
             dag.is_active = False
             session.merge(dag)
-            session.commit()
+        session.commit()

Review comment:
       In this case, only in `model/dag.py" there are also a few other usage of `session.commit()` when `@provide_session` is used. Address them in this PR as well?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #12815: Cleanup & improvements around scheduling

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #12815:
URL: https://github.com/apache/airflow/pull/12815#discussion_r536288744



##########
File path: airflow/models/dag.py
##########
@@ -1966,7 +1966,7 @@ def deactivate_stale_dags(expiration_date, session=None):
             )
             dag.is_active = False
             session.merge(dag)
-            session.commit()
+        session.commit()

Review comment:
       Possibly -- do you think they make sense all as one PR?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #12815: Cleanup & improvements around scheduling

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #12815:
URL: https://github.com/apache/airflow/pull/12815#discussion_r536283195



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -613,11 +613,6 @@ def process_file(
         3. For each DAG, see what tasks should run and create appropriate task
         instances in the DB.
         4. Record any errors importing the file into ORM
-        5. Kill (in ORM) any task instances belonging to the DAGs that haven't
-        issued a heartbeat in a while.
-
-        Returns a list of serialized_dag dicts that represent the DAGs found in
-        the file
 

Review comment:
       - `DagFileProcessor.process_file()` doesn't take care of killing zombie anymore.
   - The statement of what's returned here is stale




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #12815: Cleanup & improvements around scheduling

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #12815:
URL: https://github.com/apache/airflow/pull/12815#discussion_r536281388



##########
File path: airflow/models/dagrun.py
##########
@@ -718,19 +718,20 @@ def schedule_tis(self, schedulable_tis: Iterable[TI], session: Session = None) -
         All the TIs should belong to this DagRun, but this code is in the hot-path, this is not checked -- it
         is the caller's responsibility to call this function only with TIs from a single dag run.
         """
-        # Get list of TIs that do not need to executed, these are
+        # Get list of TI IDs that do not need to executed, these are
         # tasks using DummyOperator and without on_execute_callback / on_success_callback
-        dummy_tis = {
-            ti
-            for ti in schedulable_tis
+        dummy_ti_ids = []
+        schedulable_ti_ids = []
+        for ti in schedulable_tis:
             if (
                 ti.task.inherits_from_dummy_operator
                 and not ti.task.on_execute_callback
                 and not ti.task.on_success_callback
-            )
-        }
+            ):
+                dummy_ti_ids.append(ti.task_id)
+            else:
+                schedulable_ti_ids.append(ti.task_id)
 
-        schedulable_ti_ids = [ti.task_id for ti in schedulable_tis if ti not in dummy_tis]
         count = 0

Review comment:
       This change is to ensure we traverse `schedulable_tis` only once, rather than twice. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #12815: Cleanup & improvements around scheduling

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #12815:
URL: https://github.com/apache/airflow/pull/12815#discussion_r536290925



##########
File path: airflow/models/dag.py
##########
@@ -1966,7 +1966,7 @@ def deactivate_stale_dags(expiration_date, session=None):
             )
             dag.is_active = False
             session.merge(dag)
-            session.commit()
+        session.commit()

Review comment:
       How about this: I will skip it in this PR, and have another PR dedicated for clearing `session.commit()` project-wise. So the PR scopes are clearer. Agree?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #12815: Cleanup & improvements around scheduling

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #12815:
URL: https://github.com/apache/airflow/pull/12815#discussion_r536283503



##########
File path: airflow/models/dag.py
##########
@@ -1966,7 +1966,7 @@ def deactivate_stale_dags(expiration_date, session=None):
             )
             dag.is_active = False
             session.merge(dag)
-            session.commit()
+        session.commit()

Review comment:
       This should actually just be
   ```suggestion
           session.flush()
   ```
   
   to be in line with https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#database-session-handling




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #12815: Cleanup & improvements around scheduling

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #12815:
URL: https://github.com/apache/airflow/pull/12815#discussion_r536287662



##########
File path: airflow/models/dag.py
##########
@@ -1966,7 +1966,7 @@ def deactivate_stale_dags(expiration_date, session=None):
             )
             dag.is_active = False
             session.merge(dag)
-            session.commit()
+        session.commit()

Review comment:
       In this case, only in `model/dag.py` there are also a few other usage of `session.commit()` when `@provide_session` is used. Address them in this PR as well?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #12815: Cleanup & improvements around scheduling

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #12815:
URL: https://github.com/apache/airflow/pull/12815#discussion_r536295835



##########
File path: airflow/models/dagrun.py
##########
@@ -718,19 +718,20 @@ def schedule_tis(self, schedulable_tis: Iterable[TI], session: Session = None) -
         All the TIs should belong to this DagRun, but this code is in the hot-path, this is not checked -- it
         is the caller's responsibility to call this function only with TIs from a single dag run.
         """
-        # Get list of TIs that do not need to executed, these are
+        # Get list of TI IDs that do not need to executed, these are
         # tasks using DummyOperator and without on_execute_callback / on_success_callback
-        dummy_tis = {
-            ti
-            for ti in schedulable_tis
+        dummy_ti_ids = []
+        schedulable_ti_ids = []
+        for ti in schedulable_tis:
             if (
                 ti.task.inherits_from_dummy_operator
                 and not ti.task.on_execute_callback
                 and not ti.task.on_success_callback
-            )
-        }
+            ):
+                dummy_ti_ids.append(ti.task_id)
+            else:
+                schedulable_ti_ids.append(ti.task_id)
 
-        schedulable_ti_ids = [ti.task_id for ti in schedulable_tis if ti not in dummy_tis]
         count = 0

Review comment:
       Do you think it is worth moving these logics to a separate function? See: ``airflow.utils.helpers.partition``




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #12815: Cleanup & improvements around scheduling

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #12815:
URL: https://github.com/apache/airflow/pull/12815#discussion_r536281711



##########
File path: airflow/models/dag.py
##########
@@ -1966,7 +1966,7 @@ def deactivate_stale_dags(expiration_date, session=None):
             )
             dag.is_active = False
             session.merge(dag)
-            session.commit()
+        session.commit()

Review comment:
       Similar to `deactivate_unknown_dags()`, commit only once outside the for-loop.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #12815: Cleanup & improvements around scheduling

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #12815:
URL: https://github.com/apache/airflow/pull/12815#discussion_r536326980



##########
File path: airflow/models/dag.py
##########
@@ -1966,7 +1966,7 @@ def deactivate_stale_dags(expiration_date, session=None):
             )
             dag.is_active = False
             session.merge(dag)
-            session.commit()
+        session.commit()

Review comment:
       I created issue https://github.com/apache/airflow/issues/12818 for clearing `session.commit()`.
   
   Given it's a relatively easy fix to do, I mark it as "**good first issue**" and let's see if any new-contributor would like to pick it up (will voice in Slack).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12815: Cleanup & improvements around scheduling

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12815:
URL: https://github.com/apache/airflow/pull/12815#issuecomment-738952840


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org