You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/02/20 21:34:34 UTC

[GitHub] [airflow] mik-laj opened a new pull request #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids

mik-laj opened a new pull request #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476
 
 
   I created the following DAG file:
   ```python
   args = {
       'owner': 'airflow',
       'start_date': days_ago(3),
   }
   
   
   def create_dag(dag_number):
       dag = DAG(
           dag_id=f'perf_50_dag_dummy_tasks_{dag_number}_of_50', default_args=args,
           schedule_interval=None,
           dagrun_timeout=timedelta(minutes=60)
       )
   
       for j in range(1, 5):
           DummyOperator(
               task_id='task_{}_of_5'.format(j),
               dag=dag
           )
   
       return dag
   
   
   for i in range(1, 50):
       globals()[f"dag_{i}"] = create_dag(i)
   
   ```
   and I used the following code to test performance.
   ```python
   import functools
   import logging
   
   import time
   
   from airflow.jobs.scheduler_job import DagFileProcessor
   
   
   class CountQueries(object):
       def __init__(self):
           self.count = 0
   
       def __enter__(self):
           from sqlalchemy import event
           from airflow.settings import engine
           event.listen(engine, "after_cursor_execute", self.after_cursor_execute)
           return None
   
       def after_cursor_execute(self, *args, **kwargs):
           self.count += 1
   
       def __exit__(self, type, value, traceback):
           from sqlalchemy import event
           from airflow.settings import engine
           event.remove(engine, "after_cursor_execute", self.after_cursor_execute)
           print('Query count: ', self.count)
   
   
   count_queries = CountQueries
   
   DAG_FILE = "/files/dags/50_dag_5_dummy_tasks.py"
   
   log = logging.getLogger("airflow.processor")
   processor = DagFileProcessor([], log)
   
   def timing(f):
   
       @functools.wraps(f)
       def wrap(*args):
           RETRY_COUNT = 5
           r = []
           for i in range(RETRY_COUNT):
               time1 = time.time()
               f(*args)
               time2 = time.time()
               diff = (time2 - time1) * 1000.0
               r.append(diff)
               # print('Retry %d took %0.3f ms' % (i, diff))
           print('Average took %0.3f ms' % (sum(r) / RETRY_COUNT))
   
       return wrap
   
   
   @timing
   def slow_case():
       with count_queries():
           processor.process_file(DAG_FILE, None, pickle_dags=False)
   
   slow_case()
   ```
   
   As a result, I obtained the following results
   **Before:**
   Query count: 442
   Average time:  1182.187 ms
   
   **After:**
   Query count: 297 
   Average time: 769.421 ms  
   
   **Diff:**
   Query count: -145 (-32%)
   Average time: -413 ms (-34%)
   
   ---
   Issue link: WILL BE INSERTED BY [boring-cyborg](https://github.com/kaxil/boring-cyborg)
   
   Make sure to mark the boxes below before creating PR: [x]
   
   - [ ] Description above provides context of the change
   - [ ] Commit message/PR title starts with `[AIRFLOW-NNNN]`. AIRFLOW-NNNN = JIRA ID<sup>*</sup>
   - [ ] Unit tests coverage for changes (not needed for documentation changes)
   - [ ] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [ ] Relevant documentation is updated including usage instructions.
   - [ ] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   <sup>*</sup> For document-only changes commit message can start with `[AIRFLOW-XXXX]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476#discussion_r382476663
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -780,7 +788,12 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None):
         for dag in dagbag.dags.values():
             dag.sync_to_db()
 
-        paused_dag_ids = {dag.dag_id for dag in dagbag.dags.values() if dag.is_paused}
+        paused_dag_ids = (
+            session.query(DagModel.dag_id)
+            .filter(DagModel.is_paused.is_(True))
+            .filter(DagModel.dag_id.in_(dagbag.dag_ids))
+            .all()
+        )
 
 Review comment:
   A few lines after this we call self._process_dags, which the makes this same query again. Is it worth passing it in instead?
   
   The other thing I'm wondering API wise is if this should be encapsulated inside DagBag (something like a `dagbag.paused_dag_ids` method or accessor) - but that may not play very well with the global/long-lived DagBag object the webserver has.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] Fokko commented on a change in pull request #7476: [AIRFLOW-6856][depends on AIRFLOW-6862][WIP] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
Fokko commented on a change in pull request #7476: [AIRFLOW-6856][depends on AIRFLOW-6862][WIP] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476#discussion_r383692759
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -672,33 +672,24 @@ def _process_task_instances(self, dag, task_instances_list, session=None):
                     self.log.debug('Queuing task: %s', ti)
                     task_instances_list.append(ti.key)
 
-    def _process_dags(self, dagbag, dags, tis_out):
+    @provide_session
+    def _process_dags(self, dags, tis_out, session=None):
 
 Review comment:
   While touching this line, can we add type annotations?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io edited a comment on issue #7476: [AIRFLOW-6856][depends on AIRFLOW-6862][WIP] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7476: [AIRFLOW-6856][depends on AIRFLOW-6862][WIP] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476#issuecomment-589383726
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=h1) Report
   > Merging [#7476](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/0ec2774120d43fa667a371b384e6006e1d1c7821?src=pr&el=desc) will **decrease** coverage by `0.26%`.
   > The diff coverage is `100%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7476/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7476      +/-   ##
   ==========================================
   - Coverage   86.81%   86.55%   -0.27%     
   ==========================================
     Files         893      893              
     Lines       42193    42342     +149     
   ==========================================
   + Hits        36629    36647      +18     
   - Misses       5564     5695     +131
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `88.63% <ø> (+0.7%)` | :arrow_up: |
   | [airflow/dag/base\_dag.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9kYWcvYmFzZV9kYWcucHk=) | `69.56% <ø> (+1.56%)` | :arrow_up: |
   | [airflow/jobs/scheduler\_job.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL3NjaGVkdWxlcl9qb2IucHk=) | `90.72% <100%> (+0.48%)` | :arrow_up: |
   | [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
   | [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
   | [airflow/kubernetes/pod\_launcher.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3BvZF9sYXVuY2hlci5weQ==) | `47.18% <0%> (-45.08%)` | :arrow_down: |
   | [...viders/cncf/kubernetes/operators/kubernetes\_pod.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvY25jZi9rdWJlcm5ldGVzL29wZXJhdG9ycy9rdWJlcm5ldGVzX3BvZC5weQ==) | `69.69% <0%> (-25.26%)` | :arrow_down: |
   | [airflow/kubernetes/refresh\_config.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3JlZnJlc2hfY29uZmlnLnB5) | `50.98% <0%> (-23.53%)` | :arrow_down: |
   | [airflow/providers/amazon/aws/hooks/sns.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYW1hem9uL2F3cy9ob29rcy9zbnMucHk=) | `96.42% <0%> (-3.58%)` | :arrow_down: |
   | [airflow/models/dagrun.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFncnVuLnB5) | `95.81% <0%> (-0.76%)` | :arrow_down: |
   | ... and [9 more](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=footer). Last update [0ec2774...72478de](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7476: [AIRFLOW-6856][depends on AIRFLOW-6857, AIRFLOW-6862] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7476: [AIRFLOW-6856][depends on AIRFLOW-6857,AIRFLOW-6862] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476#discussion_r382560195
 
 

 ##########
 File path: airflow/models/dag.py
 ##########
 @@ -1448,63 +1448,88 @@ def create_dagrun(self,
 
         return run
 
+    @classmethod
     @provide_session
-    def sync_to_db(self, owner=None, sync_time=None, session=None):
+    def bulk_sync_to_db(cls, dags: List["DAG"], sync_time=None, session=None):
         """
-        Save attributes about this DAG to the DB. Note that this method
+        Save attributes about list of DAG to the DB. Note that this method
         can be called for both DAGs and SubDAGs. A SubDag is actually a
         SubDagOperator.
 
-        :param dag: the DAG object to save to the DB
-        :type dag: airflow.models.DAG
+        :param dags: the DAG objects to save to the DB
+        :type dags: List[airflow.models.dag.DAG]
         :param sync_time: The time that the DAG should be marked as sync'ed
         :type sync_time: datetime
         :return: None
         """
+        if not dags:
+            return
         from airflow.models.serialized_dag import SerializedDagModel
 
-        if owner is None:
-            owner = self.owner
         if sync_time is None:
             sync_time = timezone.utcnow()
-
-        orm_dag = session.query(
-            DagModel).filter(DagModel.dag_id == self.dag_id).first()
-        if not orm_dag:
-            orm_dag = DagModel(dag_id=self.dag_id)
-            if self.is_paused_upon_creation is not None:
-                orm_dag.is_paused = self.is_paused_upon_creation
-            self.log.info("Creating ORM DAG for %s", self.dag_id)
+        log.info("Sync %s DAGs", len(dags))
+        dag_by_ids = {dag.dag_id: dag for dag in dags}
+        dag_ids = set(dag_by_ids.keys())
+        orm_dags = session.query(DagModel)\
+            .options(
+            joinedload(DagModel.tags, innerjoin=False)
 
 Review comment:
   https://github.com/apache/airflow/pull/7477 Here is PR about DAG Sync.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj merged pull request #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
mik-laj merged pull request #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io commented on issue #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
codecov-io commented on issue #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476#issuecomment-589383726
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=h1) Report
   > Merging [#7476](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/1a9a9f7618f1c22e3e9a6ef4ec73b717c7760c7d?src=pr&el=desc) will **decrease** coverage by `0.64%`.
   > The diff coverage is `100%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7476/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7476      +/-   ##
   ==========================================
   - Coverage   86.68%   86.04%   -0.65%     
   ==========================================
     Files         882      882              
     Lines       41526    41593      +67     
   ==========================================
   - Hits        35997    35788     -209     
   - Misses       5529     5805     +276
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `85.71% <ø> (-2.41%)` | :arrow_down: |
   | [airflow/dag/base\_dag.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9kYWcvYmFzZV9kYWcucHk=) | `69.56% <ø> (+1.56%)` | :arrow_up: |
   | [airflow/jobs/scheduler\_job.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL3NjaGVkdWxlcl9qb2IucHk=) | `89.11% <100%> (-0.56%)` | :arrow_down: |
   | [...w/providers/apache/hive/operators/mysql\_to\_hive.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYXBhY2hlL2hpdmUvb3BlcmF0b3JzL215c3FsX3RvX2hpdmUucHk=) | `35.84% <0%> (-64.16%)` | :arrow_down: |
   | [airflow/operators/generic\_transfer.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvZ2VuZXJpY190cmFuc2Zlci5weQ==) | `39.28% <0%> (-60.72%)` | :arrow_down: |
   | [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
   | [airflow/providers/postgres/operators/postgres.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvcG9zdGdyZXMvb3BlcmF0b3JzL3Bvc3RncmVzLnB5) | `50% <0%> (-50%)` | :arrow_down: |
   | [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
   | [airflow/security/kerberos.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9zZWN1cml0eS9rZXJiZXJvcy5weQ==) | `30.43% <0%> (-45.66%)` | :arrow_down: |
   | [airflow/kubernetes/pod\_launcher.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3BvZF9sYXVuY2hlci5weQ==) | `47.18% <0%> (-45.08%)` | :arrow_down: |
   | ... and [15 more](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=footer). Last update [1a9a9f7...5b33dc5](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476#discussion_r382550775
 
 

 ##########
 File path: airflow/models/dag.py
 ##########
 @@ -1448,63 +1448,88 @@ def create_dagrun(self,
 
         return run
 
+    @classmethod
     @provide_session
-    def sync_to_db(self, owner=None, sync_time=None, session=None):
+    def bulk_sync_to_db(cls, dags: List["DAG"], sync_time=None, session=None):
         """
-        Save attributes about this DAG to the DB. Note that this method
+        Save attributes about list of DAG to the DB. Note that this method
         can be called for both DAGs and SubDAGs. A SubDag is actually a
         SubDagOperator.
 
-        :param dag: the DAG object to save to the DB
-        :type dag: airflow.models.DAG
+        :param dags: the DAG objects to save to the DB
+        :type dags: List[airflow.models.dag.DAG]
         :param sync_time: The time that the DAG should be marked as sync'ed
         :type sync_time: datetime
         :return: None
         """
+        if not dags:
+            return
         from airflow.models.serialized_dag import SerializedDagModel
 
-        if owner is None:
-            owner = self.owner
         if sync_time is None:
             sync_time = timezone.utcnow()
-
-        orm_dag = session.query(
-            DagModel).filter(DagModel.dag_id == self.dag_id).first()
-        if not orm_dag:
-            orm_dag = DagModel(dag_id=self.dag_id)
-            if self.is_paused_upon_creation is not None:
-                orm_dag.is_paused = self.is_paused_upon_creation
-            self.log.info("Creating ORM DAG for %s", self.dag_id)
+        log.info("Sync %s DAGs", len(dags))
 
 Review comment:
   
   ```suggestion
           self.log.info("Sync %s DAGs", len(dags))
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] vardancse commented on a change in pull request #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
vardancse commented on a change in pull request #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476#discussion_r386341140
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -816,7 +811,12 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None):
         # Save individual DAGs in the ORM and update DagModel.last_scheduled_time
         dagbag.sync_to_db()
 
-        paused_dag_ids = {dag.dag_id for dag in dagbag.dags.values() if dag.is_paused}
+        paused_dag_ids = (
+            session.query(DagModel.dag_id)
+            .filter(DagModel.is_paused.is_(True))
+            .filter(DagModel.dag_id.in_(dagbag.dag_ids))
 
 Review comment:
   Sorry for trouble, I missed **DAGs from one file** part.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7476: [AIRFLOW-6856][depends on AIRFLOW-6862][WIP] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7476: [AIRFLOW-6856][depends on AIRFLOW-6862][WIP] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476#discussion_r383696220
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -672,33 +672,24 @@ def _process_task_instances(self, dag, task_instances_list, session=None):
                     self.log.debug('Queuing task: %s', ti)
                     task_instances_list.append(ti.key)
 
-    def _process_dags(self, dagbag, dags, tis_out):
+    @provide_session
+    def _process_dags(self, dags, tis_out, session=None):
 
 Review comment:
   Adding types for classes from the core often causes cyclical imports, so we must be careful.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] vardancse commented on a change in pull request #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
vardancse commented on a change in pull request #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476#discussion_r386208633
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -816,7 +811,12 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None):
         # Save individual DAGs in the ORM and update DagModel.last_scheduled_time
         dagbag.sync_to_db()
 
-        paused_dag_ids = {dag.dag_id for dag in dagbag.dags.values() if dag.is_paused}
+        paused_dag_ids = (
+            session.query(DagModel.dag_id)
+            .filter(DagModel.is_paused.is_(True))
+            .filter(DagModel.dag_id.in_(dagbag.dag_ids))
 
 Review comment:
   Is IN query recommended here for use cases where we have 10 thousands of paused dag? Shouldn't query be broken into smaller batches? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476#discussion_r382550775
 
 

 ##########
 File path: airflow/models/dag.py
 ##########
 @@ -1448,63 +1448,88 @@ def create_dagrun(self,
 
         return run
 
+    @classmethod
     @provide_session
-    def sync_to_db(self, owner=None, sync_time=None, session=None):
+    def bulk_sync_to_db(cls, dags: List["DAG"], sync_time=None, session=None):
         """
-        Save attributes about this DAG to the DB. Note that this method
+        Save attributes about list of DAG to the DB. Note that this method
         can be called for both DAGs and SubDAGs. A SubDag is actually a
         SubDagOperator.
 
-        :param dag: the DAG object to save to the DB
-        :type dag: airflow.models.DAG
+        :param dags: the DAG objects to save to the DB
+        :type dags: List[airflow.models.dag.DAG]
         :param sync_time: The time that the DAG should be marked as sync'ed
         :type sync_time: datetime
         :return: None
         """
+        if not dags:
+            return
         from airflow.models.serialized_dag import SerializedDagModel
 
-        if owner is None:
-            owner = self.owner
         if sync_time is None:
             sync_time = timezone.utcnow()
-
-        orm_dag = session.query(
-            DagModel).filter(DagModel.dag_id == self.dag_id).first()
-        if not orm_dag:
-            orm_dag = DagModel(dag_id=self.dag_id)
-            if self.is_paused_upon_creation is not None:
-                orm_dag.is_paused = self.is_paused_upon_creation
-            self.log.info("Creating ORM DAG for %s", self.dag_id)
+        log.info("Sync %s DAGs", len(dags))
 
 Review comment:
   
   ```suggestion
           self.log.info("Sync %s DAGs", len(dags))
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] vardancse commented on a change in pull request #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
vardancse commented on a change in pull request #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476#discussion_r386341140
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -816,7 +811,12 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None):
         # Save individual DAGs in the ORM and update DagModel.last_scheduled_time
         dagbag.sync_to_db()
 
-        paused_dag_ids = {dag.dag_id for dag in dagbag.dags.values() if dag.is_paused}
+        paused_dag_ids = (
+            session.query(DagModel.dag_id)
+            .filter(DagModel.is_paused.is_(True))
+            .filter(DagModel.dag_id.in_(dagbag.dag_ids))
 
 Review comment:
   Sorry for trouble, I missed **DAGs from one file** part earlier I thought we are scanning across folder

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io edited a comment on issue #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476#issuecomment-589383726
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=h1) Report
   > Merging [#7476](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/1a9a9f7618f1c22e3e9a6ef4ec73b717c7760c7d?src=pr&el=desc) will **decrease** coverage by `0.28%`.
   > The diff coverage is `100%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7476/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff            @@
   ##           master   #7476      +/-   ##
   =========================================
   - Coverage   86.68%   86.4%   -0.29%     
   =========================================
     Files         882     882              
     Lines       41526   41593      +67     
   =========================================
   - Hits        35997   35937      -60     
   - Misses       5529    5656     +127
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `88.22% <ø> (+0.1%)` | :arrow_up: |
   | [airflow/dag/base\_dag.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9kYWcvYmFzZV9kYWcucHk=) | `69.56% <ø> (+1.56%)` | :arrow_up: |
   | [airflow/jobs/scheduler\_job.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL3NjaGVkdWxlcl9qb2IucHk=) | `89.69% <100%> (+0.03%)` | :arrow_up: |
   | [...w/providers/apache/hive/operators/mysql\_to\_hive.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYXBhY2hlL2hpdmUvb3BlcmF0b3JzL215c3FsX3RvX2hpdmUucHk=) | `100% <0%> (ø)` | :arrow_up: |
   | [airflow/operators/generic\_transfer.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvZ2VuZXJpY190cmFuc2Zlci5weQ==) | `100% <0%> (ø)` | :arrow_up: |
   | [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
   | [airflow/providers/postgres/operators/postgres.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvcG9zdGdyZXMvb3BlcmF0b3JzL3Bvc3RncmVzLnB5) | `100% <0%> (ø)` | :arrow_up: |
   | [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
   | [airflow/security/kerberos.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9zZWN1cml0eS9rZXJiZXJvcy5weQ==) | `76.08% <0%> (ø)` | :arrow_up: |
   | [airflow/kubernetes/pod\_launcher.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3BvZF9sYXVuY2hlci5weQ==) | `47.18% <0%> (-45.08%)` | :arrow_down: |
   | ... and [15 more](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=footer). Last update [1a9a9f7...5b33dc5](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7476: [AIRFLOW-6856][depends on AIRFLOW-6862][WIP] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7476: [AIRFLOW-6856][depends on AIRFLOW-6862][WIP] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476#discussion_r383695112
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -672,33 +672,24 @@ def _process_task_instances(self, dag, task_instances_list, session=None):
                     self.log.debug('Queuing task: %s', ti)
                     task_instances_list.append(ti.key)
 
-    def _process_dags(self, dagbag, dags, tis_out):
+    @provide_session
+    def _process_dags(self, dags, tis_out, session=None):
 
 Review comment:
   I want to add types for the scheduler as a separate PR. In this PR and other performance, I try to avoid unnecessary changes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io edited a comment on issue #7476: [AIRFLOW-6856][depends on AIRFLOW-6857, AIRFLOW-6862] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7476: [AIRFLOW-6856][depends on AIRFLOW-6857,AIRFLOW-6862] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476#issuecomment-589383726
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=h1) Report
   > Merging [#7476](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/1a9a9f7618f1c22e3e9a6ef4ec73b717c7760c7d?src=pr&el=desc) will **decrease** coverage by `0.26%`.
   > The diff coverage is `97.91%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7476/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7476      +/-   ##
   ==========================================
   - Coverage   86.68%   86.42%   -0.27%     
   ==========================================
     Files         882      883       +1     
     Lines       41526    41648     +122     
   ==========================================
   - Hits        35997    35994       -3     
   - Misses       5529     5654     +125
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `88.22% <ø> (+0.1%)` | :arrow_up: |
   | [airflow/dag/base\_dag.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9kYWcvYmFzZV9kYWcucHk=) | `69.56% <ø> (+1.56%)` | :arrow_up: |
   | [airflow/jobs/scheduler\_job.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL3NjaGVkdWxlcl9qb2IucHk=) | `89.89% <100%> (+0.23%)` | :arrow_up: |
   | [airflow/utils/db.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYi5weQ==) | `98.29% <100%> (-0.02%)` | :arrow_down: |
   | [airflow/models/dag.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnLnB5) | `91.06% <97.56%> (+0.16%)` | :arrow_up: |
   | [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
   | [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
   | [airflow/kubernetes/pod\_launcher.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3BvZF9sYXVuY2hlci5weQ==) | `47.18% <0%> (-45.08%)` | :arrow_down: |
   | [...viders/cncf/kubernetes/operators/kubernetes\_pod.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvY25jZi9rdWJlcm5ldGVzL29wZXJhdG9ycy9rdWJlcm5ldGVzX3BvZC5weQ==) | `69.38% <0%> (-25.52%)` | :arrow_down: |
   | [airflow/kubernetes/refresh\_config.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3JlZnJlc2hfY29uZmlnLnB5) | `50.98% <0%> (-23.53%)` | :arrow_down: |
   | ... and [10 more](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=footer). Last update [1a9a9f7...ea3997b](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io edited a comment on issue #7476: [AIRFLOW-6856][depends on AIRFLOW-6862][WIP] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7476: [AIRFLOW-6856][depends on AIRFLOW-6862][WIP] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476#issuecomment-589383726
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=h1) Report
   > Merging [#7476](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/0ec2774120d43fa667a371b384e6006e1d1c7821?src=pr&el=desc) will **decrease** coverage by `0.36%`.
   > The diff coverage is `100%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7476/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7476      +/-   ##
   ==========================================
   - Coverage   86.81%   86.44%   -0.37%     
   ==========================================
     Files         893      893              
     Lines       42193    42342     +149     
   ==========================================
   - Hits        36629    36604      -25     
   - Misses       5564     5738     +174
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `88.63% <ø> (+0.7%)` | :arrow_up: |
   | [airflow/dag/base\_dag.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9kYWcvYmFzZV9kYWcucHk=) | `69.56% <ø> (+1.56%)` | :arrow_up: |
   | [airflow/jobs/scheduler\_job.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL3NjaGVkdWxlcl9qb2IucHk=) | `90.29% <100%> (+0.05%)` | :arrow_up: |
   | [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
   | [airflow/providers/postgres/operators/postgres.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvcG9zdGdyZXMvb3BlcmF0b3JzL3Bvc3RncmVzLnB5) | `50% <0%> (-50%)` | :arrow_down: |
   | [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
   | [airflow/kubernetes/pod\_launcher.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3BvZF9sYXVuY2hlci5weQ==) | `47.18% <0%> (-45.08%)` | :arrow_down: |
   | [...roviders/google/cloud/operators/postgres\_to\_gcs.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL29wZXJhdG9ycy9wb3N0Z3Jlc190b19nY3MucHk=) | `52.94% <0%> (-32.36%)` | :arrow_down: |
   | [...viders/cncf/kubernetes/operators/kubernetes\_pod.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvY25jZi9rdWJlcm5ldGVzL29wZXJhdG9ycy9rdWJlcm5ldGVzX3BvZC5weQ==) | `69.69% <0%> (-25.26%)` | :arrow_down: |
   | [airflow/kubernetes/refresh\_config.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3JlZnJlc2hfY29uZmlnLnB5) | `50.98% <0%> (-23.53%)` | :arrow_down: |
   | ... and [15 more](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=footer). Last update [0ec2774...72478de](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476#discussion_r386300967
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -816,7 +811,12 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None):
         # Save individual DAGs in the ORM and update DagModel.last_scheduled_time
         dagbag.sync_to_db()
 
-        paused_dag_ids = {dag.dag_id for dag in dagbag.dags.values() if dag.is_paused}
+        paused_dag_ids = (
+            session.query(DagModel.dag_id)
+            .filter(DagModel.is_paused.is_(True))
+            .filter(DagModel.dag_id.in_(dagbag.dag_ids))
 
 Review comment:
   This is executed only for DAGs from one file. You would have to have a few thousand DAGs in one file. For now, I focused on the situation when we have up to 200 DAGs. If we want to support several thousand DAGs in one file, we need to introduce much more optimization and this one would not change anything.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io edited a comment on issue #7476: [AIRFLOW-6856][depends on AIRFLOW-6857, AIRFLOW-6862] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7476: [AIRFLOW-6856][depends on AIRFLOW-6857,AIRFLOW-6862] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476#issuecomment-589383726
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=h1) Report
   > Merging [#7476](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/1a9a9f7618f1c22e3e9a6ef4ec73b717c7760c7d?src=pr&el=desc) will **decrease** coverage by `0.43%`.
   > The diff coverage is `97.91%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7476/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7476      +/-   ##
   ==========================================
   - Coverage   86.68%   86.24%   -0.44%     
   ==========================================
     Files         882      883       +1     
     Lines       41526    41648     +122     
   ==========================================
   - Hits        35997    35921      -76     
   - Misses       5529     5727     +198
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `88.22% <ø> (+0.1%)` | :arrow_up: |
   | [airflow/dag/base\_dag.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9kYWcvYmFzZV9kYWcucHk=) | `69.56% <ø> (+1.56%)` | :arrow_up: |
   | [airflow/jobs/scheduler\_job.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL3NjaGVkdWxlcl9qb2IucHk=) | `89.75% <100%> (+0.08%)` | :arrow_up: |
   | [airflow/utils/db.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYi5weQ==) | `98.29% <100%> (-0.02%)` | :arrow_down: |
   | [airflow/models/dag.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnLnB5) | `91.06% <97.56%> (+0.16%)` | :arrow_up: |
   | [...w/providers/apache/hive/operators/mysql\_to\_hive.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYXBhY2hlL2hpdmUvb3BlcmF0b3JzL215c3FsX3RvX2hpdmUucHk=) | `35.84% <0%> (-64.16%)` | :arrow_down: |
   | [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
   | [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
   | [airflow/security/kerberos.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9zZWN1cml0eS9rZXJiZXJvcy5weQ==) | `30.43% <0%> (-45.66%)` | :arrow_down: |
   | [airflow/kubernetes/pod\_launcher.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3BvZF9sYXVuY2hlci5weQ==) | `47.18% <0%> (-45.08%)` | :arrow_down: |
   | ... and [17 more](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=footer). Last update [1a9a9f7...ea3997b](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io edited a comment on issue #7476: [AIRFLOW-6856][depends on AIRFLOW-6862] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7476: [AIRFLOW-6856][depends on AIRFLOW-6862] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476#issuecomment-589383726
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=h1) Report
   > Merging [#7476](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/1a9a9f7618f1c22e3e9a6ef4ec73b717c7760c7d?src=pr&el=desc) will **decrease** coverage by `0.24%`.
   > The diff coverage is `100%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7476/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7476      +/-   ##
   ==========================================
   - Coverage   86.68%   86.43%   -0.25%     
   ==========================================
     Files         882      883       +1     
     Lines       41526    41626     +100     
   ==========================================
   - Hits        35997    35980      -17     
   - Misses       5529     5646     +117
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `88.22% <ø> (+0.1%)` | :arrow_up: |
   | [airflow/dag/base\_dag.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9kYWcvYmFzZV9kYWcucHk=) | `69.56% <ø> (+1.56%)` | :arrow_up: |
   | [airflow/jobs/scheduler\_job.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL3NjaGVkdWxlcl9qb2IucHk=) | `89.91% <100%> (+0.24%)` | :arrow_up: |
   | [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
   | [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
   | [airflow/kubernetes/pod\_launcher.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3BvZF9sYXVuY2hlci5weQ==) | `47.18% <0%> (-45.08%)` | :arrow_down: |
   | [...viders/cncf/kubernetes/operators/kubernetes\_pod.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvY25jZi9rdWJlcm5ldGVzL29wZXJhdG9ycy9rdWJlcm5ldGVzX3BvZC5weQ==) | `69.38% <0%> (-25.52%)` | :arrow_down: |
   | [airflow/kubernetes/refresh\_config.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3JlZnJlc2hfY29uZmlnLnB5) | `50.98% <0%> (-23.53%)` | :arrow_down: |
   | [...oogle/marketing\_platform/hooks/campaign\_manager.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL21hcmtldGluZ19wbGF0Zm9ybS9ob29rcy9jYW1wYWlnbl9tYW5hZ2VyLnB5) | `89.23% <0%> (-10.77%)` | :arrow_down: |
   | [...\_platform/example\_dags/example\_campaign\_manager.py](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL21hcmtldGluZ19wbGF0Zm9ybS9leGFtcGxlX2RhZ3MvZXhhbXBsZV9jYW1wYWlnbl9tYW5hZ2VyLnB5) | `93.93% <0%> (-6.07%)` | :arrow_down: |
   | ... and [15 more](https://codecov.io/gh/apache/airflow/pull/7476/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=footer). Last update [1a9a9f7...ef3622f](https://codecov.io/gh/apache/airflow/pull/7476?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7476: [AIRFLOW-6856][depends on AIRFLOW-6862][WIP] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7476: [AIRFLOW-6856][depends on AIRFLOW-6862][WIP] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476#discussion_r383695112
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -672,33 +672,24 @@ def _process_task_instances(self, dag, task_instances_list, session=None):
                     self.log.debug('Queuing task: %s', ti)
                     task_instances_list.append(ti.key)
 
-    def _process_dags(self, dagbag, dags, tis_out):
+    @provide_session
+    def _process_dags(self, dags, tis_out, session=None):
 
 Review comment:
   I want to add types for the scheduler as a separate PR. In this PR and other performance, I try to avoid additional changes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476#discussion_r386300967
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -816,7 +811,12 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None):
         # Save individual DAGs in the ORM and update DagModel.last_scheduled_time
         dagbag.sync_to_db()
 
-        paused_dag_ids = {dag.dag_id for dag in dagbag.dags.values() if dag.is_paused}
+        paused_dag_ids = (
+            session.query(DagModel.dag_id)
+            .filter(DagModel.is_paused.is_(True))
+            .filter(DagModel.dag_id.in_(dagbag.dag_ids))
 
 Review comment:
   This is executed only for DAGs from one file. In one file you would have to have several thousand DAGs to cause problems. For now, I focused on the situation when we have up to 200 DAGs. If we want to support several thousand DAGs in one file, we need to introduce much more optimization and this one would not change anything.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476#discussion_r382545084
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -780,7 +788,12 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None):
         for dag in dagbag.dags.values():
             dag.sync_to_db()
 
-        paused_dag_ids = {dag.dag_id for dag in dagbag.dags.values() if dag.is_paused}
+        paused_dag_ids = (
+            session.query(DagModel.dag_id)
+            .filter(DagModel.is_paused.is_(True))
+            .filter(DagModel.dag_id.in_(dagbag.dag_ids))
+            .all()
+        )
 
 Review comment:
   Good catch. dags parameters contain only active dags, so we don't have to check it a second time.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7476: [AIRFLOW-6856][depends on AIRFLOW-6857, AIRFLOW-6862] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7476: [AIRFLOW-6856][depends on AIRFLOW-6857,AIRFLOW-6862] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476#discussion_r382559852
 
 

 ##########
 File path: airflow/models/dag.py
 ##########
 @@ -1448,63 +1448,88 @@ def create_dagrun(self,
 
         return run
 
+    @classmethod
     @provide_session
-    def sync_to_db(self, owner=None, sync_time=None, session=None):
+    def bulk_sync_to_db(cls, dags: List["DAG"], sync_time=None, session=None):
         """
-        Save attributes about this DAG to the DB. Note that this method
+        Save attributes about list of DAG to the DB. Note that this method
         can be called for both DAGs and SubDAGs. A SubDag is actually a
         SubDagOperator.
 
-        :param dag: the DAG object to save to the DB
-        :type dag: airflow.models.DAG
+        :param dags: the DAG objects to save to the DB
+        :type dags: List[airflow.models.dag.DAG]
         :param sync_time: The time that the DAG should be marked as sync'ed
         :type sync_time: datetime
         :return: None
         """
+        if not dags:
+            return
         from airflow.models.serialized_dag import SerializedDagModel
 
-        if owner is None:
-            owner = self.owner
         if sync_time is None:
             sync_time = timezone.utcnow()
-
-        orm_dag = session.query(
-            DagModel).filter(DagModel.dag_id == self.dag_id).first()
-        if not orm_dag:
-            orm_dag = DagModel(dag_id=self.dag_id)
-            if self.is_paused_upon_creation is not None:
-                orm_dag.is_paused = self.is_paused_upon_creation
-            self.log.info("Creating ORM DAG for %s", self.dag_id)
+        log.info("Sync %s DAGs", len(dags))
+        dag_by_ids = {dag.dag_id: dag for dag in dags}
+        dag_ids = set(dag_by_ids.keys())
+        orm_dags = session.query(DagModel)\
+            .options(
+            joinedload(DagModel.tags, innerjoin=False)
 
 Review comment:
   This loads all the dags for all the dags we've loaded in one query, rather than needing one query for each dag. This is commonly called an `n+1` query situation (which as Kamil has shown, are expensive/results in lots of extra queries)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7476: [AIRFLOW-6856][depends on AIRFLOW-6857, AIRFLOW-6862] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7476: [AIRFLOW-6856][depends on AIRFLOW-6857,AIRFLOW-6862] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476#discussion_r382559852
 
 

 ##########
 File path: airflow/models/dag.py
 ##########
 @@ -1448,63 +1448,88 @@ def create_dagrun(self,
 
         return run
 
+    @classmethod
     @provide_session
-    def sync_to_db(self, owner=None, sync_time=None, session=None):
+    def bulk_sync_to_db(cls, dags: List["DAG"], sync_time=None, session=None):
         """
-        Save attributes about this DAG to the DB. Note that this method
+        Save attributes about list of DAG to the DB. Note that this method
         can be called for both DAGs and SubDAGs. A SubDag is actually a
         SubDagOperator.
 
-        :param dag: the DAG object to save to the DB
-        :type dag: airflow.models.DAG
+        :param dags: the DAG objects to save to the DB
+        :type dags: List[airflow.models.dag.DAG]
         :param sync_time: The time that the DAG should be marked as sync'ed
         :type sync_time: datetime
         :return: None
         """
+        if not dags:
+            return
         from airflow.models.serialized_dag import SerializedDagModel
 
-        if owner is None:
-            owner = self.owner
         if sync_time is None:
             sync_time = timezone.utcnow()
-
-        orm_dag = session.query(
-            DagModel).filter(DagModel.dag_id == self.dag_id).first()
-        if not orm_dag:
-            orm_dag = DagModel(dag_id=self.dag_id)
-            if self.is_paused_upon_creation is not None:
-                orm_dag.is_paused = self.is_paused_upon_creation
-            self.log.info("Creating ORM DAG for %s", self.dag_id)
+        log.info("Sync %s DAGs", len(dags))
+        dag_by_ids = {dag.dag_id: dag for dag in dags}
+        dag_ids = set(dag_by_ids.keys())
+        orm_dags = session.query(DagModel)\
+            .options(
+            joinedload(DagModel.tags, innerjoin=False)
 
 Review comment:
   This loads all the tags for all the dags we've loaded in one query, rather than needing one query for each dag. This is commonly called an `n+1` query situation (which as Kamil has shown, are expensive/results in lots of extra queries)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7476: [AIRFLOW-6856] Bulk fetch paused_dag_ids
URL: https://github.com/apache/airflow/pull/7476#discussion_r382553186
 
 

 ##########
 File path: airflow/models/dag.py
 ##########
 @@ -1448,63 +1448,88 @@ def create_dagrun(self,
 
         return run
 
+    @classmethod
     @provide_session
-    def sync_to_db(self, owner=None, sync_time=None, session=None):
+    def bulk_sync_to_db(cls, dags: List["DAG"], sync_time=None, session=None):
         """
-        Save attributes about this DAG to the DB. Note that this method
+        Save attributes about list of DAG to the DB. Note that this method
         can be called for both DAGs and SubDAGs. A SubDag is actually a
         SubDagOperator.
 
-        :param dag: the DAG object to save to the DB
-        :type dag: airflow.models.DAG
+        :param dags: the DAG objects to save to the DB
+        :type dags: List[airflow.models.dag.DAG]
         :param sync_time: The time that the DAG should be marked as sync'ed
         :type sync_time: datetime
         :return: None
         """
+        if not dags:
+            return
         from airflow.models.serialized_dag import SerializedDagModel
 
-        if owner is None:
-            owner = self.owner
         if sync_time is None:
             sync_time = timezone.utcnow()
-
-        orm_dag = session.query(
-            DagModel).filter(DagModel.dag_id == self.dag_id).first()
-        if not orm_dag:
-            orm_dag = DagModel(dag_id=self.dag_id)
-            if self.is_paused_upon_creation is not None:
-                orm_dag.is_paused = self.is_paused_upon_creation
-            self.log.info("Creating ORM DAG for %s", self.dag_id)
+        log.info("Sync %s DAGs", len(dags))
+        dag_by_ids = {dag.dag_id: dag for dag in dags}
+        dag_ids = set(dag_by_ids.keys())
+        orm_dags = session.query(DagModel)\
+            .options(
+            joinedload(DagModel.tags, innerjoin=False)
 
 Review comment:
   Why do we need this?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services