You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/02/21 16:59:04 UTC

[GitHub] [airflow] mik-laj opened a new pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRun

mik-laj opened a new pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRun
URL: https://github.com/apache/airflow/pull/7489
 
 
   Another performance optimization.
   When a DAG file contains 200 DAGs, and each DAG contains 5 tasks, we get the following values.
   **Before:**
   Query count:  1792
   Average time: 5842.346 ms
   **After:**
   Query count:  1395
   Average time: 4839.020 ms
   **Diff:**
   Query count: -397 (-22%)
   Average time: -1003 ms (-17%)
   
   If I didn't make a mistake, when we combine the following changes:
   * https://github.com/apache/airflow/pull/7481
   * https://github.com/apache/airflow/pull/7477
   * https://github.com/apache/airflow/pull/7476
   
   we get only... 5 query per file instead of ....1792
   
   Thanks for support to @evgenyshulman from Databand!
   
   ---
   Issue link: WILL BE INSERTED BY [boring-cyborg](https://github.com/kaxil/boring-cyborg)
   
   Make sure to mark the boxes below before creating PR: [x]
   
   - [X] Description above provides context of the change
   - [X] Commit message/PR title starts with `[AIRFLOW-NNNN]`. AIRFLOW-NNNN = JIRA ID<sup>*</sup>
   - [X] Unit tests coverage for changes (not needed for documentation changes)
   - [X] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [X] Relevant documentation is updated including usage instructions.
   - [X] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   <sup>*</sup> For document-only changes commit message can start with `[AIRFLOW-XXXX]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj opened a new pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns

Posted by GitBox <gi...@apache.org>.
mik-laj opened a new pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns
URL: https://github.com/apache/airflow/pull/7489
 
 
   Another performance optimization.
   When a DAG file contains 200 DAGs, and each DAG contains 5 tasks, we get the following values.
   **Before:**
   Query count:  1792
   Average time: 5842.346 ms
   **After:**
   Query count:  1395
   Average time: 4839.020 ms
   **Diff:**
   Query count: -397 (-22%)
   Average time: -1003 ms (-17%)
   
   If I didn't make a mistake, when we combine the following changes:
   * https://github.com/apache/airflow/pull/7481
   * https://github.com/apache/airflow/pull/7477
   * https://github.com/apache/airflow/pull/7476
   
   we get only... 5 queries per file instead of ....1792
   
   Thanks for support to @evgenyshulman from Databand!
   
   ---
   Issue link: [AIRFLOW-6869](https://issues.apache.org/jira/browse/AIRFLOW-6869)
   
   Make sure to mark the boxes below before creating PR: [x]
   
   - [X] Description above provides context of the change
   - [X] Commit message/PR title starts with `[AIRFLOW-NNNN]`. AIRFLOW-NNNN = JIRA ID<sup>*</sup>
   - [X] Unit tests coverage for changes (not needed for documentation changes)
   - [X] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [X] Relevant documentation is updated including usage instructions.
   - [X] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   <sup>*</sup> For document-only changes commit message can start with `[AIRFLOW-XXXX]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7489: [AIRFLOW-6869][DONT-MERGE] Bulk fetch DAGRuns for _process_task_instances

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7489: [AIRFLOW-6869][DONT-MERGE] Bulk fetch DAGRuns for _process_task_instances
URL: https://github.com/apache/airflow/pull/7489#discussion_r384939770
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -699,23 +727,30 @@ def _process_dags(self, dagbag, dags, tis_out):
                 self.log.info("Not processing DAG %s since it's paused", dag.dag_id)
                 continue
 
-            self.log.info("Processing %s", dag.dag_id)
+            dag_id = dag.dag_id
+            self.log.info("Processing %s", dag_id)
+            dag_runs_for_dag = dag_runs_by_dag_id[dag_id] if dag_id in dag_runs_by_dag_id else []
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7489: [AIRFLOW-6869][DONT-MERGE] Bulk fetch DAGRuns for _process_task_instances

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7489: [AIRFLOW-6869][DONT-MERGE] Bulk fetch DAGRuns for _process_task_instances
URL: https://github.com/apache/airflow/pull/7489#discussion_r384939724
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -684,11 +687,36 @@ def _process_dags(self, dagbag, dags, tis_out):
         :type dagbag: airflow.models.DagBag
         :param dags: the DAGs from the DagBag to process
         :type dags: List[airflow.models.DAG]
-        :param tis_out: A list to add generated TaskInstance objects
-        :type tis_out: list[TaskInstance]
-        :rtype: None
+        :rtype: list[TaskInstance]
+        :return: A list of generated TaskInstance objects
         """
         check_slas = conf.getboolean('core', 'CHECK_SLAS', fallback=True)
+
+        tis_out = []
+        dag_ids = [dag.dag_id for dag in dags]
+        dag_runs = DagRun.find(dag_ids=dag_ids, state=State.RUNNING, session=session)
+        # list() is needed because of a bug in Python 3.7+
+        #
+        # The following code returns different values depending on the Python version
+        # from itertools import groupby
+        # from unittest.mock import MagicMock
+        # key = "key"
+        # item = MagicMock(attr=key)
+        # items = [item]
+        # items_by_attr = {k: v for k, v in groupby(items, lambda d: d.attr)}
+        # print("items_by_attr=", items_by_attr)
+        # item_with_key = list(items_by_attr[key]) if key in items_by_attr else []
+        # print("item_with_key=", item_with_key)
+        #
+        # Python 3.7+:
+        # items_by_attr= {'key': <itertools._grouper object at 0x7f3b9f38d4d0>}
+        # item_with_key= []
+        #
+        # Python 3.6:
+        # items_by_attr= {'key': <itertools._grouper object at 0x101128630>}
+        # item_with_key= [<MagicMock id='4310405416'>]
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances
URL: https://github.com/apache/airflow/pull/7489#discussion_r383452035
 
 

 ##########
 File path: airflow/models/dagrun.py
 ##########
 @@ -125,14 +125,16 @@ def refresh_from_db(self, session=None):
 
     @staticmethod
     @provide_session
-    def find(dag_id=None, run_id=None, execution_date=None,
+    def find(dag_id=None, dag_ids=None, run_id=None, execution_date=None,
 
 Review comment:
   I will change it during the next rebase.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] houqp commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances
URL: https://github.com/apache/airflow/pull/7489#discussion_r383450533
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -684,11 +687,36 @@ def _process_dags(self, dagbag, dags, tis_out):
         :type dagbag: airflow.models.DagBag
         :param dags: the DAGs from the DagBag to process
         :type dags: List[airflow.models.DAG]
-        :param tis_out: A list to add generated TaskInstance objects
-        :type tis_out: list[TaskInstance]
-        :rtype: None
+        :rtype: list[TaskInstance]
+        :return: A list of generated TaskInstance objects
         """
         check_slas = conf.getboolean('core', 'CHECK_SLAS', fallback=True)
+
+        tis_out = []
+        dag_ids = [dag.dag_id for dag in dags]
+        dag_runs = DagRun.find(dag_ids=dag_ids, state=State.RUNNING, session=session)
+        # list() is needed because of a bug in Python 3.7+
+        #
+        # The following code returns different values depending on the Python version
+        # from itertools import groupby
+        # from unittest.mock import MagicMock
+        # key = "key"
+        # item = MagicMock(attr=key)
+        # items = [item]
+        # items_by_attr = {k: v for k, v in groupby(items, lambda d: d.attr)}
+        # print("items_by_attr=", items_by_attr)
+        # item_with_key = list(items_by_attr[key]) if key in items_by_attr else []
+        # print("item_with_key=", item_with_key)
+        #
+        # Python 3.7+:
+        # items_by_attr= {'key': <itertools._grouper object at 0x7f3b9f38d4d0>}
+        # item_with_key= []
+        #
+        # Python 3.6:
+        # items_by_attr= {'key': <itertools._grouper object at 0x101128630>}
+        # item_with_key= [<MagicMock id='4310405416'>]
 
 Review comment:
   +1 on the comment change recommended by @ashb. Based the official doc, behavior from both 3.6 and 3.7 are expected and within the API spec. Returned group items are transient with each iteration and should be manually persisted if needed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances
URL: https://github.com/apache/airflow/pull/7489#discussion_r382941092
 
 

 ##########
 File path: airflow/models/dagrun.py
 ##########
 @@ -125,14 +125,16 @@ def refresh_from_db(self, session=None):
 
     @staticmethod
     @provide_session
-    def find(dag_id=None, run_id=None, execution_date=None,
+    def find(dag_id=None, dag_ids=None, run_id=None, execution_date=None,
 
 Review comment:
   Do we need a separate argument for this? we can make dag_id to support list & str both

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances
URL: https://github.com/apache/airflow/pull/7489#discussion_r382940926
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -699,23 +727,30 @@ def _process_dags(self, dagbag, dags, tis_out):
                 self.log.info("Not processing DAG %s since it's paused", dag.dag_id)
                 continue
 
-            self.log.info("Processing %s", dag.dag_id)
+            dag_id = dag.dag_id
+            self.log.info("Processing %s", dag_id)
+            dag_runs_for_dag = dag_runs_by_dag_id[dag_id] if dag_id in dag_runs_by_dag_id else []
 
             # Only creates DagRun for DAGs that are not subdag since
             # DagRun of subdags are created when SubDagOperator executes.
             if not dag.is_subdag:
                 dag_run = self.create_dag_run(dag)
                 if dag_run:
+                    dag_runs_for_dag.append(dag_run)
                     expected_start_date = dag.following_schedule(dag_run.execution_date)
                     if expected_start_date:
                         schedule_delay = dag_run.start_date - expected_start_date
                         Stats.timing(
                             'dagrun.schedule_delay.{dag_id}'.format(dag_id=dag.dag_id),
                             schedule_delay)
-                self.log.info("Created %s", dag_run)
-            self._process_task_instances(dag, tis_out)
-            if check_slas:
-                self.manage_slas(dag)
+                    self.log.info("Created %s", dag_run)
+
+            if dag_runs_for_dag:
+                tis_out.extend(self._process_task_instances(dag, dag_runs_for_dag))
+                if check_slas:
+                    self.manage_slas(dag)
 
 Review comment:
   Should the `check_slas` be outside `if dag_runs_for_dag:`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances
URL: https://github.com/apache/airflow/pull/7489#discussion_r383814563
 
 

 ##########
 File path: airflow/models/dagrun.py
 ##########
 @@ -125,14 +125,16 @@ def refresh_from_db(self, session=None):
 
     @staticmethod
     @provide_session
-    def find(dag_id=None, run_id=None, execution_date=None,
+    def find(dag_id=None, dag_ids=None, run_id=None, execution_date=None,
 
 Review comment:
   In this PR, I want to focus only on necessary changes. I want to add type-hint in separate PR, but type-hint in core are problematic. They cause cyclical imports.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] saguziel commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances

Posted by GitBox <gi...@apache.org>.
saguziel commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances
URL: https://github.com/apache/airflow/pull/7489#discussion_r383544228
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -699,23 +727,30 @@ def _process_dags(self, dagbag, dags, tis_out):
                 self.log.info("Not processing DAG %s since it's paused", dag.dag_id)
                 continue
 
-            self.log.info("Processing %s", dag.dag_id)
+            dag_id = dag.dag_id
+            self.log.info("Processing %s", dag_id)
+            dag_runs_for_dag = dag_runs_by_dag_id[dag_id] if dag_id in dag_runs_by_dag_id else []
 
 Review comment:
   d.get(dag_id, []) instead pls

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] Fokko commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances

Posted by GitBox <gi...@apache.org>.
Fokko commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances
URL: https://github.com/apache/airflow/pull/7489#discussion_r383706748
 
 

 ##########
 File path: airflow/models/dagrun.py
 ##########
 @@ -125,14 +125,16 @@ def refresh_from_db(self, session=None):
 
     @staticmethod
     @provide_session
-    def find(dag_id=None, run_id=None, execution_date=None,
+    def find(dag_id=None, dag_ids=None, run_id=None, execution_date=None,
 
 Review comment:
   Can we add type annotations here as well?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances
URL: https://github.com/apache/airflow/pull/7489#discussion_r383451630
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -684,11 +687,36 @@ def _process_dags(self, dagbag, dags, tis_out):
         :type dagbag: airflow.models.DagBag
         :param dags: the DAGs from the DagBag to process
         :type dags: List[airflow.models.DAG]
-        :param tis_out: A list to add generated TaskInstance objects
-        :type tis_out: list[TaskInstance]
-        :rtype: None
+        :rtype: list[TaskInstance]
+        :return: A list of generated TaskInstance objects
         """
         check_slas = conf.getboolean('core', 'CHECK_SLAS', fallback=True)
+
+        tis_out = []
+        dag_ids = [dag.dag_id for dag in dags]
+        dag_runs = DagRun.find(dag_ids=dag_ids, state=State.RUNNING, session=session)
+        # list() is needed because of a bug in Python 3.7+
+        #
+        # The following code returns different values depending on the Python version
+        # from itertools import groupby
+        # from unittest.mock import MagicMock
+        # key = "key"
+        # item = MagicMock(attr=key)
+        # items = [item]
+        # items_by_attr = {k: v for k, v in groupby(items, lambda d: d.attr)}
+        # print("items_by_attr=", items_by_attr)
+        # item_with_key = list(items_by_attr[key]) if key in items_by_attr else []
+        # print("item_with_key=", item_with_key)
+        #
+        # Python 3.7+:
+        # items_by_attr= {'key': <itertools._grouper object at 0x7f3b9f38d4d0>}
+        # item_with_key= []
+        #
+        # Python 3.6:
+        # items_by_attr= {'key': <itertools._grouper object at 0x101128630>}
+        # item_with_key= [<MagicMock id='4310405416'>]
 
 Review comment:
   I will update the description during the next rebase.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances
URL: https://github.com/apache/airflow/pull/7489#discussion_r383253824
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -684,11 +687,36 @@ def _process_dags(self, dagbag, dags, tis_out):
         :type dagbag: airflow.models.DagBag
         :param dags: the DAGs from the DagBag to process
         :type dags: List[airflow.models.DAG]
-        :param tis_out: A list to add generated TaskInstance objects
-        :type tis_out: list[TaskInstance]
-        :rtype: None
+        :rtype: list[TaskInstance]
+        :return: A list of generated TaskInstance objects
         """
         check_slas = conf.getboolean('core', 'CHECK_SLAS', fallback=True)
+
+        tis_out = []
+        dag_ids = [dag.dag_id for dag in dags]
+        dag_runs = DagRun.find(dag_ids=dag_ids, state=State.RUNNING, session=session)
+        # list() is needed because of a bug in Python 3.7+
+        #
+        # The following code returns different values depending on the Python version
+        # from itertools import groupby
+        # from unittest.mock import MagicMock
+        # key = "key"
+        # item = MagicMock(attr=key)
+        # items = [item]
+        # items_by_attr = {k: v for k, v in groupby(items, lambda d: d.attr)}
+        # print("items_by_attr=", items_by_attr)
+        # item_with_key = list(items_by_attr[key]) if key in items_by_attr else []
+        # print("item_with_key=", item_with_key)
+        #
+        # Python 3.7+:
+        # items_by_attr= {'key': <itertools._grouper object at 0x7f3b9f38d4d0>}
+        # item_with_key= []
+        #
+        # Python 3.6:
+        # items_by_attr= {'key': <itertools._grouper object at 0x101128630>}
+        # item_with_key= [<MagicMock id='4310405416'>]
 
 Review comment:
   The behaviour is different on py3.6 and 3.7, but is still wrong on both when a more than a single item is in `items`: 3.6 would return the last item only.
   
   The [docs for groupby](https://docs.python.org/3/library/itertools.html#itertools.groupby) say:
   
   > Because the source is shared, when the groupby() object is advanced, the previous group is no longer visible. So, if that data is needed later, it should be stored as a list.
   
   Since the behaviour without list is broken in otherways on 3.6 too I think we can just replace this comment with:
   
   ```
            # As per the docs of groupby (https://docs.python.org/3/library/itertools.html#itertools.groupby)
            # we need to use `list()` otherwise the result will be wrong/incomplete
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances
URL: https://github.com/apache/airflow/pull/7489#discussion_r382941484
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -699,23 +727,30 @@ def _process_dags(self, dagbag, dags, tis_out):
                 self.log.info("Not processing DAG %s since it's paused", dag.dag_id)
                 continue
 
-            self.log.info("Processing %s", dag.dag_id)
+            dag_id = dag.dag_id
+            self.log.info("Processing %s", dag_id)
+            dag_runs_for_dag = dag_runs_by_dag_id[dag_id] if dag_id in dag_runs_by_dag_id else []
 
             # Only creates DagRun for DAGs that are not subdag since
             # DagRun of subdags are created when SubDagOperator executes.
             if not dag.is_subdag:
                 dag_run = self.create_dag_run(dag)
                 if dag_run:
+                    dag_runs_for_dag.append(dag_run)
                     expected_start_date = dag.following_schedule(dag_run.execution_date)
                     if expected_start_date:
                         schedule_delay = dag_run.start_date - expected_start_date
                         Stats.timing(
                             'dagrun.schedule_delay.{dag_id}'.format(dag_id=dag.dag_id),
                             schedule_delay)
-                self.log.info("Created %s", dag_run)
-            self._process_task_instances(dag, tis_out)
-            if check_slas:
-                self.manage_slas(dag)
+                    self.log.info("Created %s", dag_run)
+
+            if dag_runs_for_dag:
+                tis_out.extend(self._process_task_instances(dag, dag_runs_for_dag))
+                if check_slas:
+                    self.manage_slas(dag)
 
 Review comment:
   No, because there must be a DagRun if the task was to be executed. It makes no sense to check the SLA when Dag has never been started.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns

Posted by GitBox <gi...@apache.org>.
nuclearpinguin commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns
URL: https://github.com/apache/airflow/pull/7489#discussion_r382907514
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -700,22 +728,29 @@ def _process_dags(self, dagbag, dags, tis_out):
                 continue
 
             self.log.info("Processing %s", dag.dag_id)
+            dag_id = dag.dag_id
+            dag_runs_for_dag = list(dag_runs_by_dag_id[dag.dag_id]) if dag_id in dag_runs_by_dag_id else []
 
 Review comment:
   ```suggestion
               dag_runs_for_dag = list(dag_runs_by_dag_id[dag_id]) if dag_id in dag_runs_by_dag_id else []
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances
URL: https://github.com/apache/airflow/pull/7489#discussion_r383254411
 
 

 ##########
 File path: airflow/models/dagrun.py
 ##########
 @@ -125,14 +125,16 @@ def refresh_from_db(self, session=None):
 
     @staticmethod
     @provide_session
-    def find(dag_id=None, run_id=None, execution_date=None,
+    def find(dag_id=None, dag_ids=None, run_id=None, execution_date=None,
 
 Review comment:
   Yeah, that's a fairly common pattern in our other methods.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances
URL: https://github.com/apache/airflow/pull/7489#discussion_r383814846
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -631,16 +632,16 @@ def create_dag_run(self, dag, session=None):
                 return next_run
 
     @provide_session
-    def _process_task_instances(self, dag, task_instances_list, session=None):
+    def _process_task_instances(self, dag, dag_runs, session=None):
 
 Review comment:
   In this PR, I want to focus only on necessary changes. I want to add type-hint in separate PR, because type-hint in the core are problematic. They cause cyclical imports.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7489: [AIRFLOW-6869][DONT-MERGE] Bulk fetch DAGRuns for _process_task_instances

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7489: [AIRFLOW-6869][DONT-MERGE] Bulk fetch DAGRuns for _process_task_instances
URL: https://github.com/apache/airflow/pull/7489#discussion_r384940547
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -684,11 +687,36 @@ def _process_dags(self, dagbag, dags, tis_out):
         :type dagbag: airflow.models.DagBag
         :param dags: the DAGs from the DagBag to process
         :type dags: List[airflow.models.DAG]
-        :param tis_out: A list to add generated TaskInstance objects
-        :type tis_out: list[TaskInstance]
-        :rtype: None
+        :rtype: list[TaskInstance]
+        :return: A list of generated TaskInstance objects
         """
         check_slas = conf.getboolean('core', 'CHECK_SLAS', fallback=True)
+
+        tis_out = []
+        dag_ids = [dag.dag_id for dag in dags]
 
 Review comment:
   The list is little faster. 
   https://stackoverflow.com/questions/2831212/python-sets-vs-lists

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj closed pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns

Posted by GitBox <gi...@apache.org>.
mik-laj closed pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns
URL: https://github.com/apache/airflow/pull/7489
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances
URL: https://github.com/apache/airflow/pull/7489#discussion_r383032005
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -699,23 +727,30 @@ def _process_dags(self, dagbag, dags, tis_out):
                 self.log.info("Not processing DAG %s since it's paused", dag.dag_id)
                 continue
 
-            self.log.info("Processing %s", dag.dag_id)
+            dag_id = dag.dag_id
+            self.log.info("Processing %s", dag_id)
+            dag_runs_for_dag = dag_runs_by_dag_id[dag_id] if dag_id in dag_runs_by_dag_id else []
 
             # Only creates DagRun for DAGs that are not subdag since
             # DagRun of subdags are created when SubDagOperator executes.
             if not dag.is_subdag:
                 dag_run = self.create_dag_run(dag)
                 if dag_run:
+                    dag_runs_for_dag.append(dag_run)
                     expected_start_date = dag.following_schedule(dag_run.execution_date)
                     if expected_start_date:
                         schedule_delay = dag_run.start_date - expected_start_date
                         Stats.timing(
                             'dagrun.schedule_delay.{dag_id}'.format(dag_id=dag.dag_id),
                             schedule_delay)
-                self.log.info("Created %s", dag_run)
-            self._process_task_instances(dag, tis_out)
-            if check_slas:
-                self.manage_slas(dag)
+                    self.log.info("Created %s", dag_run)
+
+            if dag_runs_for_dag:
+                tis_out.extend(self._process_task_instances(dag, dag_runs_for_dag))
+                if check_slas:
+                    self.manage_slas(dag)
 
 Review comment:
   If the task in DAGRun was executed then there must be at least one active DagRun. `DagRun.update_state` method is called by `_process_task_instances.  First, we fetch the runs that are running, and then change from running to another state. If no run was running, then no task should be running.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns

Posted by GitBox <gi...@apache.org>.
nuclearpinguin commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns
URL: https://github.com/apache/airflow/pull/7489#discussion_r382907491
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -700,22 +728,29 @@ def _process_dags(self, dagbag, dags, tis_out):
                 continue
 
             self.log.info("Processing %s", dag.dag_id)
+            dag_id = dag.dag_id
+            dag_runs_for_dag = list(dag_runs_by_dag_id[dag.dag_id]) if dag_id in dag_runs_by_dag_id else []
 
 Review comment:
   I think `dag_runs_by_dag_id[dag.dag_id]` is already a list so no need for another cast?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] Fokko commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances

Posted by GitBox <gi...@apache.org>.
Fokko commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances
URL: https://github.com/apache/airflow/pull/7489#discussion_r383705945
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -631,16 +632,16 @@ def create_dag_run(self, dag, session=None):
                 return next_run
 
     @provide_session
-    def _process_task_instances(self, dag, task_instances_list, session=None):
+    def _process_task_instances(self, dag, dag_runs, session=None):
 
 Review comment:
   Can we add type annotations here as well?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances
URL: https://github.com/apache/airflow/pull/7489#discussion_r383451630
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -684,11 +687,36 @@ def _process_dags(self, dagbag, dags, tis_out):
         :type dagbag: airflow.models.DagBag
         :param dags: the DAGs from the DagBag to process
         :type dags: List[airflow.models.DAG]
-        :param tis_out: A list to add generated TaskInstance objects
-        :type tis_out: list[TaskInstance]
-        :rtype: None
+        :rtype: list[TaskInstance]
+        :return: A list of generated TaskInstance objects
         """
         check_slas = conf.getboolean('core', 'CHECK_SLAS', fallback=True)
+
+        tis_out = []
+        dag_ids = [dag.dag_id for dag in dags]
+        dag_runs = DagRun.find(dag_ids=dag_ids, state=State.RUNNING, session=session)
+        # list() is needed because of a bug in Python 3.7+
+        #
+        # The following code returns different values depending on the Python version
+        # from itertools import groupby
+        # from unittest.mock import MagicMock
+        # key = "key"
+        # item = MagicMock(attr=key)
+        # items = [item]
+        # items_by_attr = {k: v for k, v in groupby(items, lambda d: d.attr)}
+        # print("items_by_attr=", items_by_attr)
+        # item_with_key = list(items_by_attr[key]) if key in items_by_attr else []
+        # print("item_with_key=", item_with_key)
+        #
+        # Python 3.7+:
+        # items_by_attr= {'key': <itertools._grouper object at 0x7f3b9f38d4d0>}
+        # item_with_key= []
+        #
+        # Python 3.6:
+        # items_by_attr= {'key': <itertools._grouper object at 0x101128630>}
+        # item_with_key= [<MagicMock id='4310405416'>]
 
 Review comment:
   I update the description during the next rebase.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances
URL: https://github.com/apache/airflow/pull/7489#discussion_r382944080
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -699,23 +727,30 @@ def _process_dags(self, dagbag, dags, tis_out):
                 self.log.info("Not processing DAG %s since it's paused", dag.dag_id)
                 continue
 
-            self.log.info("Processing %s", dag.dag_id)
+            dag_id = dag.dag_id
+            self.log.info("Processing %s", dag_id)
+            dag_runs_for_dag = dag_runs_by_dag_id[dag_id] if dag_id in dag_runs_by_dag_id else []
 
             # Only creates DagRun for DAGs that are not subdag since
             # DagRun of subdags are created when SubDagOperator executes.
             if not dag.is_subdag:
                 dag_run = self.create_dag_run(dag)
                 if dag_run:
+                    dag_runs_for_dag.append(dag_run)
                     expected_start_date = dag.following_schedule(dag_run.execution_date)
                     if expected_start_date:
                         schedule_delay = dag_run.start_date - expected_start_date
                         Stats.timing(
                             'dagrun.schedule_delay.{dag_id}'.format(dag_id=dag.dag_id),
                             schedule_delay)
-                self.log.info("Created %s", dag_run)
-            self._process_task_instances(dag, tis_out)
-            if check_slas:
-                self.manage_slas(dag)
+                    self.log.info("Created %s", dag_run)
+
+            if dag_runs_for_dag:
+                tis_out.extend(self._process_task_instances(dag, dag_runs_for_dag))
+                if check_slas:
+                    self.manage_slas(dag)
 
 Review comment:
   I am thinking of a case where a DagRun is just completed and the Dag (or last task) has missed SLA. In which case DagRun won't be in RUNNING state and would be filtered in the following check:
   
   ```
   dag_runs = DagRun.find(dag_ids=dag_ids, state=State.RUNNING, session=session)
   ```
   
   And SLAs won't be triggered for such task

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io commented on issue #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances

Posted by GitBox <gi...@apache.org>.
codecov-io commented on issue #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances
URL: https://github.com/apache/airflow/pull/7489#issuecomment-589953700
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7489?src=pr&el=h1) Report
   > Merging [#7489](https://codecov.io/gh/apache/airflow/pull/7489?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/4e0e2f0da39626c1bc8026a33c2080a6b11f2c41?src=pr&el=desc) will **decrease** coverage by `0.25%`.
   > The diff coverage is `96.66%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7489/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7489?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7489      +/-   ##
   ==========================================
   - Coverage   86.79%   86.54%   -0.26%     
   ==========================================
     Files         887      891       +4     
     Lines       41976    42110     +134     
   ==========================================
   + Hits        36432    36442      +10     
   - Misses       5544     5668     +124
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7489?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/jobs/scheduler\_job.py](https://codecov.io/gh/apache/airflow/pull/7489/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL3NjaGVkdWxlcl9qb2IucHk=) | `90.11% <100%> (-0.13%)` | :arrow_down: |
   | [airflow/models/dagrun.py](https://codecov.io/gh/apache/airflow/pull/7489/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFncnVuLnB5) | `96.2% <83.33%> (-0.37%)` | :arrow_down: |
   | [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7489/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
   | [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7489/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
   | [airflow/kubernetes/pod\_launcher.py](https://codecov.io/gh/apache/airflow/pull/7489/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3BvZF9sYXVuY2hlci5weQ==) | `47.18% <0%> (-45.08%)` | :arrow_down: |
   | [...viders/cncf/kubernetes/operators/kubernetes\_pod.py](https://codecov.io/gh/apache/airflow/pull/7489/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvY25jZi9rdWJlcm5ldGVzL29wZXJhdG9ycy9rdWJlcm5ldGVzX3BvZC5weQ==) | `69.38% <0%> (-25.52%)` | :arrow_down: |
   | [airflow/kubernetes/refresh\_config.py](https://codecov.io/gh/apache/airflow/pull/7489/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3JlZnJlc2hfY29uZmlnLnB5) | `50.98% <0%> (-23.53%)` | :arrow_down: |
   | [airflow/utils/file.py](https://codecov.io/gh/apache/airflow/pull/7489/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9maWxlLnB5) | `87.83% <0%> (-1.36%)` | :arrow_down: |
   | [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7489/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `87.93% <0%> (-0.2%)` | :arrow_down: |
   | [airflow/models/baseoperator.py](https://codecov.io/gh/apache/airflow/pull/7489/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvYmFzZW9wZXJhdG9yLnB5) | `96.52% <0%> (ø)` | :arrow_up: |
   | ... and [27 more](https://codecov.io/gh/apache/airflow/pull/7489/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7489?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7489?src=pr&el=footer). Last update [4e0e2f0...f94e1fc](https://codecov.io/gh/apache/airflow/pull/7489?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj merged pull request #7489: [AIRFLOW-6869] Bulk fetch DAGRuns for _process_task_instances

Posted by GitBox <gi...@apache.org>.
mik-laj merged pull request #7489: [AIRFLOW-6869] Bulk fetch DAGRuns for _process_task_instances
URL: https://github.com/apache/airflow/pull/7489
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] Fokko commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances

Posted by GitBox <gi...@apache.org>.
Fokko commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances
URL: https://github.com/apache/airflow/pull/7489#discussion_r383706497
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -684,11 +687,36 @@ def _process_dags(self, dagbag, dags, tis_out):
         :type dagbag: airflow.models.DagBag
         :param dags: the DAGs from the DagBag to process
         :type dags: List[airflow.models.DAG]
-        :param tis_out: A list to add generated TaskInstance objects
-        :type tis_out: list[TaskInstance]
-        :rtype: None
+        :rtype: list[TaskInstance]
+        :return: A list of generated TaskInstance objects
         """
         check_slas = conf.getboolean('core', 'CHECK_SLAS', fallback=True)
+
+        tis_out = []
+        dag_ids = [dag.dag_id for dag in dags]
 
 Review comment:
   ```suggestion
           dag_ids = {dag.dag_id for dag in dags}
   ```
   This will turn it into a set :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] Fokko commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances

Posted by GitBox <gi...@apache.org>.
Fokko commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances
URL: https://github.com/apache/airflow/pull/7489#discussion_r383706008
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -631,16 +632,16 @@ def create_dag_run(self, dag, session=None):
                 return next_run
 
     @provide_session
-    def _process_task_instances(self, dag, task_instances_list, session=None):
+    def _process_task_instances(self, dag, dag_runs, session=None):
         """
         This method schedules the tasks for a single DAG by looking at the
         active DAG runs and adding task instances that should run to the
         queue.
         """
 
         # update the state of the previously active dag runs
-        dag_runs = DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)
         active_dag_runs = 0
+        task_instances_list = []
 
 Review comment:
   Nice one, this makes the function far less awkward 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns
URL: https://github.com/apache/airflow/pull/7489#discussion_r382908580
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -700,22 +728,29 @@ def _process_dags(self, dagbag, dags, tis_out):
                 continue
 
             self.log.info("Processing %s", dag.dag_id)
+            dag_id = dag.dag_id
+            dag_runs_for_dag = list(dag_runs_by_dag_id[dag.dag_id]) if dag_id in dag_runs_by_dag_id else []
 
 Review comment:
   Fixed. Thanks.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances
URL: https://github.com/apache/airflow/pull/7489#discussion_r383042987
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -699,23 +727,30 @@ def _process_dags(self, dagbag, dags, tis_out):
                 self.log.info("Not processing DAG %s since it's paused", dag.dag_id)
                 continue
 
-            self.log.info("Processing %s", dag.dag_id)
+            dag_id = dag.dag_id
+            self.log.info("Processing %s", dag_id)
+            dag_runs_for_dag = dag_runs_by_dag_id[dag_id] if dag_id in dag_runs_by_dag_id else []
 
             # Only creates DagRun for DAGs that are not subdag since
             # DagRun of subdags are created when SubDagOperator executes.
             if not dag.is_subdag:
                 dag_run = self.create_dag_run(dag)
                 if dag_run:
+                    dag_runs_for_dag.append(dag_run)
                     expected_start_date = dag.following_schedule(dag_run.execution_date)
                     if expected_start_date:
                         schedule_delay = dag_run.start_date - expected_start_date
                         Stats.timing(
                             'dagrun.schedule_delay.{dag_id}'.format(dag_id=dag.dag_id),
                             schedule_delay)
-                self.log.info("Created %s", dag_run)
-            self._process_task_instances(dag, tis_out)
-            if check_slas:
-                self.manage_slas(dag)
+                    self.log.info("Created %s", dag_run)
+
+            if dag_runs_for_dag:
+                tis_out.extend(self._process_task_instances(dag, dag_runs_for_dag))
+                if check_slas:
+                    self.manage_slas(dag)
 
 Review comment:
   Yes, you are right

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services