You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2020/02/21 00:02:00 UTC

[jira] [Commented] (AIRFLOW-6857) Bulk sync DAGs

    [ https://issues.apache.org/jira/browse/AIRFLOW-6857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17041382#comment-17041382 ] 

ASF GitHub Bot commented on AIRFLOW-6857:
-----------------------------------------

mik-laj commented on pull request #7477: [AIRFLOW-6857][depends on AIRFLOW-6856] Bulk sync DAGs
URL: https://github.com/apache/airflow/pull/7477
 
 
   I created the following DAG file:
   ```python
   args = {
       'owner': 'airflow',
       'start_date': days_ago(3),
   }
   
   
   def create_dag(dag_number):
       dag = DAG(
           dag_id=f'perf_50_dag_dummy_tasks_{dag_number}_of_50', default_args=args,
           schedule_interval=None,
           dagrun_timeout=timedelta(minutes=60)
       )
   
       for j in range(1, 5):
           DummyOperator(
               task_id='task_{}_of_5'.format(j),
               dag=dag
           )
   
       return dag
   
   
   for i in range(1, 200):
       globals()[f"dag_{i}"] = create_dag(i)
   
   ```
   and I used the following code to test performance.
   ```python
   import functools
   import logging
   
   import time
   
   from airflow.jobs.scheduler_job import DagFileProcessor
   
   
   class CountQueries(object):
       def __init__(self):
           self.count = 0
   
       def __enter__(self):
           from sqlalchemy import event
           from airflow.settings import engine
           event.listen(engine, "after_cursor_execute", self.after_cursor_execute)
           return None
   
       def after_cursor_execute(self, *args, **kwargs):
           self.count += 1
   
       def __exit__(self, type, value, traceback):
           from sqlalchemy import event
           from airflow.settings import engine
           event.remove(engine, "after_cursor_execute", self.after_cursor_execute)
           print('Query count: ', self.count)
   
   
   count_queries = CountQueries
   
   DAG_FILE = "/files/dags/200_dag_5_dummy_tasks.py"
   
   log = logging.getLogger("airflow.processor")
   processor = DagFileProcessor([], log)
   
   def timing(f):
   
       @functools.wraps(f)
       def wrap(*args):
           RETRY_COUNT = 5
           r = []
           for i in range(RETRY_COUNT):
               time1 = time.time()
               f(*args)
               time2 = time.time()
               diff = (time2 - time1) * 1000.0
               r.append(diff)
               # print('Retry %d took %0.3f ms' % (i, diff))
           print('Average took %0.3f ms' % (sum(r) / RETRY_COUNT))
   
       return wrap
   
   
   @timing
   def slow_case():
       with count_queries():
           processor.process_file(DAG_FILE, None, pickle_dags=False)
   
   slow_case()
   ```
   I also cherry-picked AIRFLOW-6856
   
   As a result, I obtained the following values
   
   **Master**:
   Query count: 1792
   Average took 4505.891 ms
   
   **AIRFLOW-6856:**
   Query count: 1197
   Average took 3203.710 ms
   
   **Current:**
   Query count: 602 
   Average time: 2018.891 ms
   
   **Diff to AIRFLOW-6856**
   Query count: -1190 (-66%)
   Average time: -1185 ms (-36%)
   
   **Diff to master*
   Query count: -592 (-49%)
   Average time: -2484 ms (-55%)
   
   Thanks for support to @evgenyshulman from Databand!
   
   
   ---
   Issue link: WILL BE INSERTED BY [boring-cyborg](https://github.com/kaxil/boring-cyborg)
   
   Make sure to mark the boxes below before creating PR: [x]
   
   - [ ] Description above provides context of the change
   - [ ] Commit message/PR title starts with `[AIRFLOW-NNNN]`. AIRFLOW-NNNN = JIRA ID<sup>*</sup>
   - [ ] Unit tests coverage for changes (not needed for documentation changes)
   - [ ] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [ ] Relevant documentation is updated including usage instructions.
   - [ ] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   <sup>*</sup> For document-only changes commit message can start with `[AIRFLOW-XXXX]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Bulk sync DAGs
> --------------
>
>                 Key: AIRFLOW-6857
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6857
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>    Affects Versions: 1.10.9
>            Reporter: Kamil Bregula
>            Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)