You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/01/06 15:59:22 UTC

[GitHub] [airflow] ashb opened a new pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 15-40%

ashb opened a new pull request #20722:
URL: https://github.com/apache/airflow/pull/20722


   This uses the "bulk" operation API of SQLAlchemy to get a big speed
   up. Due to the `task_instance_mutation_hook` we still need to keep
   actual TaskInstance objects around.
   
   For postgresql we have enabled to "batch operation helpers"[1] which
   makes it even faster. The default page sizes are chosen somewhat
   randomly based on the SQLA docs.
   
   To make these options configurable I have added (and used here and in
   KubeConfig) a new `getjson` option to AirflowConfigParser class.
   
   *Postgresql is over 40% faster*:
   
   Before:
   
   ```
   number_of_tis=1 mean=0.004397215199423954 per=0.004397215199423954 times=[0.009390181003254838, 0.002814065999700688, 0.00284132499655243, 0.0036120269942330196, 0.0033284770033787936]
   number_of_tis=10 mean=0.008078816600027494 per=0.0008078816600027494 times=[0.011014281000825576, 0.008476420000079088, 0.00741832799394615, 0.006857775995740667, 0.006627278009545989]
   number_of_tis=50 mean=0.01927847799670417 per=0.00038556955993408336 times=[0.02556803499464877, 0.01935569499619305, 0.01662322599440813, 0.01840184700267855, 0.01644358699559234]
   number_of_tis=100 mean=0.03301511880126782 per=0.00033015118801267817 times=[0.04117956099798903, 0.030890661000739783, 0.03007458901265636, 0.03125198099587578, 0.03167880199907813]
   number_of_tis=500 mean=0.15320950179593637 per=0.0003064190035918727 times=[0.20054609200451523, 0.14052859699586406, 0.14509809199080337, 0.1365471329918364, 0.1433275949966628]
   number_of_tis=1000 mean=0.2929377429973101 per=0.0002929377429973101 times=[0.3517978919990128, 0.2807794280088274, 0.2806490379880415, 0.27710555399244186, 0.27435680299822707]
   number_of_tis=3000 mean=0.9935687056015012 per=0.00033118956853383374 times=[1.2047388390055858, 0.8248025969951414, 0.8685875020019012, 0.9017027500085533, 1.1680118399963249]
   number_of_tis=5000 mean=1.5349355740036117 per=0.00030698711480072236 times=[1.8663743910001358, 1.5182018500054255, 1.5446484510030132, 1.3932801040064078, 1.3521730740030762]
   number_of_tis=10000 mean=3.7448632712010292 per=0.0003744863271201029 times=[4.135914924001554, 3.4411147559876554, 3.526543836007477, 3.7195197630062466, 3.9012230770022143]
   number_of_tis=15000 mean=6.3099766838044165 per=0.00042066511225362775 times=[6.552250057997298, 6.1369703890086384, 6.8749958210100885, 6.067943914007628, 5.917723236998427]
   number_of_tis=20000 mean=8.317583500797628 per=0.00041587917503988143 times=[8.720249108009739, 8.0188543760014, 8.328030352990027, 8.398350054994808, 8.122433611992165]
   ```
   
   After:
   
   ```
   number_of_tis=1 mean=0.026246879794052803 per=0.026246879794052803 times=[0.031441625993466005, 0.025166517996694893, 0.02518146399233956, 0.024703859991859645, 0.02474093099590391]
   number_of_tis=10 mean=0.02652196400158573 per=0.002652196400158573 times=[0.027266821009106934, 0.026017504002084024, 0.02769480799906887, 0.025840838003205135, 0.025789848994463682]
   number_of_tis=50 mean=0.032463929001824 per=0.00064927858003648 times=[0.03659850900294259, 0.03128377899702173, 0.03133225999772549, 0.030985830002464354, 0.032119267008965835]
   number_of_tis=100 mean=0.03862043260014616 per=0.0003862043260014616 times=[0.04082123900298029, 0.03752484500000719, 0.037281844997778535, 0.03927708099945448, 0.0381971530005103]
   number_of_tis=500 mean=0.10123570079740603 per=0.00020247140159481206 times=[0.11780315199575853, 0.09932849500910379, 0.10016329499194399, 0.09410478499194141, 0.09477877699828241]
   number_of_tis=1000 mean=0.17536458960094023 per=0.00017536458960094024 times=[0.20034298300743103, 0.17775658299797215, 0.17178491500089876, 0.16488367799320258, 0.16205478900519665]
   number_of_tis=3000 mean=0.5013463032053551 per=0.00016711543440178504 times=[0.6868100110004889, 0.46566563300439157, 0.44849480800621677, 0.4379984680126654, 0.46776259600301273]
   number_of_tis=5000 mean=0.840471555799013 per=0.0001680943111598026 times=[1.0285392189980485, 0.8854761679976946, 0.7579579270095564, 0.730956947998493, 0.7994275169912726]
   number_of_tis=10000 mean=1.975292908004485 per=0.0001975292908004485 times=[1.9648507620004239, 1.8537165410089074, 1.8826112380047562, 1.9254138420074014, 2.2498721570009366]
   number_of_tis=15000 mean=3.4746556333935588 per=0.00023164370889290392 times=[4.0400224499899196, 3.1751998239924433, 3.6206128539924975, 3.6852884859981714, 2.8521545529947616]
   number_of_tis=20000 mean=4.678154367001843 per=0.00023390771835009216 times=[4.465847548010061, 4.571855771995615, 4.749505186002352, 4.724330568002188, 4.8792327609990025]
   ```
   
   MySQL is only 10-15% faster (and a lot noisier)
   
   Before:
   
   ```
   number_of_tis=1 mean=0.006164804595755413 per=0.006164804595755413 times=[0.013516580002033152, 0.00427598599344492, 0.004508020996581763, 0.004067091998877004, 0.004456343987840228]
   number_of_tis=10 mean=0.007822793803643435 per=0.0007822793803643434 times=[0.0081135170039488, 0.00719467100861948, 0.009007985994685441, 0.00758794900320936, 0.007209846007754095]
   number_of_tis=50 mean=0.020377356800599954 per=0.00040754713601199905 times=[0.02612382399092894, 0.018950315003166907, 0.019109474000288174, 0.018008680999628268, 0.019694490008987486]
   number_of_tis=100 mean=0.040682651600218375 per=0.00040682651600218374 times=[0.05449078499805182, 0.037430580996442586, 0.039291110006161034, 0.03625023599306587, 0.035950546007370576]
   number_of_tis=500 mean=0.18646696420037187 per=0.00037293392840074375 times=[0.24278165798750706, 0.17090376401029062, 0.1837275660072919, 0.16893767600413412, 0.1659841569926357]
   number_of_tis=1000 mean=0.5903461098030676 per=0.0005903461098030675 times=[0.6001852740009781, 0.5642872750031529, 0.686630773008801, 0.5578094649972627, 0.5428177620051429]
   number_of_tis=3000 mean=1.9076304554007948 per=0.0006358768184669316 times=[2.042052763994434, 2.1137778090051142, 1.7461599689995637, 1.7260139089921722, 1.9101478260126896]
   number_of_tis=5000 mean=2.9185905692051164 per=0.0005837181138410233 times=[2.9221124830073677, 3.2889883980096783, 2.7569778940087417, 2.973596281008213, 2.651277789991582]
   number_of_tis=10000 mean=8.880191986600403 per=0.0008880191986600403 times=[7.3548113360011484, 9.13715232499817, 9.568511486999341, 8.80206210000324, 9.538422685000114]
   number_of_tis=15000 mean=15.426499317999696 per=0.0010284332878666464 times=[14.944712879005237, 15.38737604500784, 15.409629273999599, 15.852925243991194, 15.53785314799461]
   number_of_tis=20000 mean=20.579332908798825 per=0.0010289666454399414 times=[20.362008597003296, 19.878823954990366, 20.73281196100288, 20.837948996995692, 21.085071034001885]
   ```
   
   After:
   
   ```
   number_of_tis=1 mean=0.04114753239555284 per=0.04114753239555284 times=[0.05534043599618599, 0.03716265498951543, 0.039479082988691516, 0.03779561800183728, 0.035959870001534]
   number_of_tis=10 mean=0.038440523599274454 per=0.003844052359927445 times=[0.03949839199776761, 0.03853203100152314, 0.03801383898826316, 0.03784418400027789, 0.03831417200854048]
   number_of_tis=50 mean=0.05345874359773006 per=0.0010691748719546012 times=[0.07045628099876922, 0.04431965999538079, 0.06068256100115832, 0.04566028399858624, 0.04617493199475575]
   number_of_tis=100 mean=0.06805712619971019 per=0.0006805712619971019 times=[0.07946423999965191, 0.06054415399557911, 0.06277450300694909, 0.07836744099040516, 0.05913529300596565]
   number_of_tis=500 mean=0.17929348759935237 per=0.00035858697519870476 times=[0.2792787920043338, 0.16563376400154084, 0.14093860499269795, 0.1464673139998922, 0.16414896299829707]
   number_of_tis=1000 mean=0.3883620931970654 per=0.00038836209319706536 times=[0.47511668599327095, 0.3506359229941154, 0.43458069299231283, 0.33563552900159266, 0.3458416350040352]
   number_of_tis=3000 mean=1.3977356655988842 per=0.0004659118885329614 times=[1.575020256001153, 1.3353702509921277, 1.4193720350012882, 1.4037733709992608, 1.2551424150005914]
   number_of_tis=5000 mean=2.3742491033975965 per=0.0004748498206795193 times=[2.4926851909986, 2.501419166001142, 2.2862377730052685, 2.4421103859931463, 2.1487930009898264]
   number_of_tis=10000 mean=8.138347979800892 per=0.0008138347979800893 times=[6.648954969001352, 8.001181932995678, 8.551437315007206, 9.084980526997242, 8.405185155002982]
   number_of_tis=15000 mean=14.065810968197184 per=0.0009377207312131455 times=[13.222158194999793, 14.375066226988565, 14.108006285998272, 14.157014351992984, 14.466809781006305]
   number_of_tis=20000 mean=18.36637533060275 per=0.0009183187665301375 times=[17.728908119010157, 18.62269214099797, 18.936747477011522, 17.74613195299753, 18.797396962996572]
   ```
   
   [1]: https://docs.sqlalchemy.org/en/13/dialects/postgresql.html#psycopg2-batch-mode
   
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#discussion_r780223997



##########
File path: airflow/utils/platform.py
##########
@@ -63,6 +65,7 @@ def get_airflow_git_version():
     return git_version
 
 
+@cache

Review comment:
       Scratch that -- I decided to remove the setting of `unixname` attribute for TaskInstances when they are created -- it doesn't make sense to have a value there until they have been executed onec.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#discussion_r780170018



##########
File path: airflow/models/taskinstance.py
##########
@@ -477,6 +477,25 @@ def __init__(
         # can be changed when calling 'run'
         self.test_mode = False
 
+    @staticmethod
+    def insert_mapping(run_id: str, task: "BaseOperator") -> dict:
+        """:meta private:"""
+        return {
+            'dag_id': task.dag_id,
+            'task_id': task.task_id,
+            'run_id': run_id,
+            '_try_number': 0,
+            'unixname': getuser(),
+            'queue': task.queue,
+            'pool': task.pool,
+            'pool_slots': task.pool_slots,
+            'priority_weight': task.priority_weight_total,
+            'run_as_user': task.run_as_user,
+            'max_tries': task.retries,
+            'executor_config': task.executor_config,

Review comment:
       Covered by this existing test in test_taskinstances (and I verified that it is going via the insert_mappings path too):
   
   ```python
       def test_ti_updates_with_task(self, create_task_instance, session=None):
           """
           test that updating the executor_config propagates to the TaskInstance DB
           """
           ti = create_task_instance(
               dag_id='test_run_pooling_task',
               task_id='test_run_pooling_task_op',
               executor_config={'foo': 'bar'},
           )
           dag = ti.task.dag
   
           ti.run(session=session)
           tis = dag.get_task_instances()
           assert {'foo': 'bar'} == tis[0].executor_config
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#issuecomment-1006761768


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#issuecomment-1006762496


   Please let me merge this one -- I want to update the commit message to make sense.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#discussion_r780161896



##########
File path: airflow/models/dagrun.py
##########
@@ -804,18 +806,42 @@ def verify_integrity(self, session: Session = NEW_SESSION):
                 ti.state = State.NONE
             session.merge(ti)
 
-        # check for missing tasks
-        for task in dag.task_dict.values():
-            if task.start_date > self.execution_date and not self.is_backfill:
-                continue
+        def task_filter(task: "BaseOperator"):
+            return task.task_id not in task_ids and (
+                self.is_backfill or task.start_date <= self.execution_date
+            )
+
+        created_counts: Dict[str, int] = Counter()

Review comment:
       I think the slow down wasn't from using Counter, but just having to walk the list of 20k items twice. Anyway, defaultdict it is.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#issuecomment-1007642827


   Static checks failed with 
   
   ```
   docker: error during connect: Post http://%2Fvar%2Frun%2Fdocker.sock/v1.24/containers/create?platform=linux%2Famd64: EOF.
   See 'docker run --help'.
   
   ERROR: The previous step completed with error. Please take a look at output above 
   ```
   
   I ran `pre-commit run -a` and only mypy failed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#discussion_r780172962



##########
File path: airflow/utils/platform.py
##########
@@ -63,6 +65,7 @@ def get_airflow_git_version():
     return git_version
 
 
+@cache

Review comment:
       I added this as there is 0 need to call this 20k times when creating a dag -- it can't change at runtime.
   
   (Okay, the name of the current user _technically_ could change at runtime, but that is very much an edge case, and not to mention that the value created in the scheduler doesn't matter, it will be re-overwritten when the Task executes, and that is the interesting value anyway




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb merged pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #20722:
URL: https://github.com/apache/airflow/pull/20722


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb merged pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #20722:
URL: https://github.com/apache/airflow/pull/20722


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] pingzh commented on a change in pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
pingzh commented on a change in pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#discussion_r780449646



##########
File path: airflow/models/taskinstance.py
##########
@@ -477,6 +477,25 @@ def __init__(
         # can be changed when calling 'run'
         self.test_mode = False
 
+    @staticmethod
+    def insert_mapping(run_id: str, task: "BaseOperator") -> dict:
+        """:meta private:"""
+        return {
+            'dag_id': task.dag_id,
+            'task_id': task.task_id,
+            'run_id': run_id,
+            '_try_number': 0,
+            'unixname': getuser(),
+            'queue': task.queue,
+            'pool': task.pool,
+            'pool_slots': task.pool_slots,
+            'priority_weight': task.priority_weight_total,
+            'run_as_user': task.run_as_user,
+            'max_tries': task.retries,
+            'executor_config': task.executor_config,

Review comment:
       nice, thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andriisoldatenko commented on a change in pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
andriisoldatenko commented on a change in pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#discussion_r783372254



##########
File path: airflow/configuration.py
##########
@@ -493,6 +493,32 @@ def getimport(self, section, key, **kwargs):
                 f'Current value: "{full_qualified_path}".'
             )
 
+    def getjson(self, section, key, fallback=_UNSET, **kwargs) -> Union[dict, list, str, int, float, None]:
+        """
+        Return a config value parsed from a JSON string.
+
+        ``fallback`` is *not* JSON parsed but used verbatim when no config value is given.
+        """
+        # get always returns the fallback value as a string, so for this if
+        # someone gives us an object we want to keep that
+        default = _UNSET
+        if fallback is not _UNSET:
+            default = fallback
+            fallback = _UNSET
+
+        try:
+            data = self.get(section=section, key=key, fallback=fallback, **kwargs)
+        except (NoSectionError, NoOptionError):
+            return default
+
+        if len(data) == 0:

Review comment:
       @ashb any reason to not check `if data` instead ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#discussion_r779976394



##########
File path: airflow/models/dagrun.py
##########
@@ -804,18 +806,42 @@ def verify_integrity(self, session: Session = NEW_SESSION):
                 ti.state = State.NONE
             session.merge(ti)
 
-        # check for missing tasks
-        for task in dag.task_dict.values():
-            if task.start_date > self.execution_date and not self.is_backfill:
-                continue
+        def task_filter(task: "BaseOperator"):
+            return task.task_id not in task_ids and (
+                self.is_backfill or task.start_date <= self.execution_date
+            )
+
+        created_counts: Dict[str, int] = Counter()

Review comment:
       Since we are just manually incrementing the counts anyway, it would be better to use a `defaultdict(int)` here instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#discussion_r779977566



##########
File path: airflow/models/dagrun.py
##########
@@ -804,18 +806,42 @@ def verify_integrity(self, session: Session = NEW_SESSION):
                 ti.state = State.NONE
             session.merge(ti)
 
-        # check for missing tasks
-        for task in dag.task_dict.values():
-            if task.start_date > self.execution_date and not self.is_backfill:
-                continue
+        def task_filter(task: "BaseOperator"):
+            return task.task_id not in task_ids and (
+                self.is_backfill or task.start_date <= self.execution_date
+            )
+
+        created_counts: Dict[str, int] = Counter()

Review comment:
       But `Counter` would be more useful if you change the below to something like
   
   ```python
   def create_ti(task: "BaseOperator") -> TI:
       ti = TI(task, run_id=self.run_id)
       task_instance_mutation_hook(ti)
       return ti
   
   if hook_is_noop:
       session.bulk_insert_mappings(TI, (TI.insert_mapping(self.run_id, t) for t in tasks))
   else:
       session.bulk_save_objects(create_ti(t) for t in tasks)
   created_counts.update(t.task_type for t in tasks)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 15-40%

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#issuecomment-1006730217


   And I've found another 30% or so for postgres (testing MySQL) so it's now _over twice as fast_ at creating dag runs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 15-40%

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#issuecomment-1006729842


   Guess who cant do maths. Postgres is 77% quicker, not 40%.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#discussion_r780129526



##########
File path: airflow/models/dagrun.py
##########
@@ -804,18 +806,42 @@ def verify_integrity(self, session: Session = NEW_SESSION):
                 ti.state = State.NONE
             session.merge(ti)
 
-        # check for missing tasks
-        for task in dag.task_dict.values():
-            if task.start_date > self.execution_date and not self.is_backfill:
-                continue
+        def task_filter(task: "BaseOperator"):
+            return task.task_id not in task_ids and (
+                self.is_backfill or task.start_date <= self.execution_date
+            )
+
+        created_counts: Dict[str, int] = Counter()

Review comment:
       Yeah, I had it this way before (hence why Counter in the first place) but doing it this way seemed a lot slower. Let me test that again.

##########
File path: airflow/models/dagrun.py
##########
@@ -804,18 +806,42 @@ def verify_integrity(self, session: Session = NEW_SESSION):
                 ti.state = State.NONE
             session.merge(ti)
 
-        # check for missing tasks
-        for task in dag.task_dict.values():
-            if task.start_date > self.execution_date and not self.is_backfill:
-                continue
+        def task_filter(task: "BaseOperator"):
+            return task.task_id not in task_ids and (
+                self.is_backfill or task.start_date <= self.execution_date
+            )
+
+        created_counts: Dict[str, int] = Counter()

Review comment:
       I think the slow down wasn't from using Counter, but just having to walk the list of 20k items twice. Anyway, defaultdict it is.

##########
File path: airflow/models/taskinstance.py
##########
@@ -477,6 +477,25 @@ def __init__(
         # can be changed when calling 'run'
         self.test_mode = False
 
+    @staticmethod
+    def insert_mapping(run_id: str, task: "BaseOperator") -> dict:
+        """:meta private:"""
+        return {
+            'dag_id': task.dag_id,
+            'task_id': task.task_id,
+            'run_id': run_id,
+            '_try_number': 0,
+            'unixname': getuser(),
+            'queue': task.queue,
+            'pool': task.pool,
+            'pool_slots': task.pool_slots,
+            'priority_weight': task.priority_weight_total,
+            'run_as_user': task.run_as_user,
+            'max_tries': task.retries,
+            'executor_config': task.executor_config,

Review comment:
       Covered by this existing test in test_taskinstances (and I verified that it is going via the insert_mappings path too):
   
   ```python
       def test_ti_updates_with_task(self, create_task_instance, session=None):
           """
           test that updating the executor_config propagates to the TaskInstance DB
           """
           ti = create_task_instance(
               dag_id='test_run_pooling_task',
               task_id='test_run_pooling_task_op',
               executor_config={'foo': 'bar'},
           )
           dag = ti.task.dag
   
           ti.run(session=session)
           tis = dag.get_task_instances()
           assert {'foo': 'bar'} == tis[0].executor_config
   ```

##########
File path: airflow/utils/platform.py
##########
@@ -63,6 +65,7 @@ def get_airflow_git_version():
     return git_version
 
 
+@cache

Review comment:
       I added this as there is 0 need to call this 20k times when creating a dag -- it can't change at runtime.
   
   (Okay, it _technically_ could change the name at runtime, but that is very much an edge case, and not to mention that the value created in the scheduler doesn't matter, it will be re-overwritten when the Task executes, and that is the interesting value anyway

##########
File path: airflow/utils/platform.py
##########
@@ -63,6 +65,7 @@ def get_airflow_git_version():
     return git_version
 
 
+@cache

Review comment:
       I added this as there is 0 need to call this 20k times when creating a dag -- it can't change at runtime.
   
   (Okay, the name of the current user _technically_ could change at runtime, but that is very much an edge case, and not to mention that the value created in the scheduler doesn't matter, it will be re-overwritten when the Task executes, and that is the interesting value anyway

##########
File path: airflow/utils/platform.py
##########
@@ -63,6 +65,7 @@ def get_airflow_git_version():
     return git_version
 
 
+@cache

Review comment:
       Scratch that -- I decided to remove the setting of `unixname` attribute for TaskInstances when they are created -- it doesn't make sense to have a value there until they have been executed onec.

##########
File path: airflow/utils/platform.py
##########
@@ -63,6 +65,7 @@ def get_airflow_git_version():
     return git_version
 
 
+@cache

Review comment:
       Aaand reverted  the "remove setting unixname" as the API schema _requires_ a value for unixname, so changing that would be a bigger/separate change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] pingzh commented on a change in pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
pingzh commented on a change in pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#discussion_r779743219



##########
File path: airflow/models/taskinstance.py
##########
@@ -477,6 +477,25 @@ def __init__(
         # can be changed when calling 'run'
         self.test_mode = False
 
+    @staticmethod
+    def insert_mapping(run_id: str, task: "BaseOperator") -> dict:
+        """:meta private:"""
+        return {
+            'dag_id': task.dag_id,
+            'task_id': task.task_id,
+            'run_id': run_id,
+            '_try_number': 0,
+            'unixname': getuser(),
+            'queue': task.queue,
+            'pool': task.pool,
+            'pool_slots': task.pool_slots,
+            'priority_weight': task.priority_weight_total,
+            'run_as_user': task.run_as_user,
+            'max_tries': task.retries,
+            'executor_config': task.executor_config,

Review comment:
       qq, i have only used `bulk_save_objects`. will the `executor_config` be correctly serialized?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] pingzh commented on pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
pingzh commented on pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#issuecomment-1006793209


   nice, we have been using `bulk_save_objects` for a while, which saved us around ~30 seconds in production when creating large dag run (from ~45 seconds to ~17 seconds)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#issuecomment-1006818253


   Maybe this change will give more speed up on mysql in practice than I got in my testing
   
   🤞🏻


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#discussion_r780129526



##########
File path: airflow/models/dagrun.py
##########
@@ -804,18 +806,42 @@ def verify_integrity(self, session: Session = NEW_SESSION):
                 ti.state = State.NONE
             session.merge(ti)
 
-        # check for missing tasks
-        for task in dag.task_dict.values():
-            if task.start_date > self.execution_date and not self.is_backfill:
-                continue
+        def task_filter(task: "BaseOperator"):
+            return task.task_id not in task_ids and (
+                self.is_backfill or task.start_date <= self.execution_date
+            )
+
+        created_counts: Dict[str, int] = Counter()

Review comment:
       Yeah, I had it this way before (hence why Counter in the first place) but doing it this way seemed a lot slower. Let me test that again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#discussion_r780367513



##########
File path: airflow/utils/platform.py
##########
@@ -63,6 +65,7 @@ def get_airflow_git_version():
     return git_version
 
 
+@cache

Review comment:
       Aaand reverted  the "remove setting unixname" as the API schema _requires_ a value for unixname, so changing that would be a bigger/separate change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#discussion_r779743795



##########
File path: airflow/models/taskinstance.py
##########
@@ -477,6 +477,25 @@ def __init__(
         # can be changed when calling 'run'
         self.test_mode = False
 
+    @staticmethod
+    def insert_mapping(run_id: str, task: "BaseOperator") -> dict:
+        """:meta private:"""
+        return {
+            'dag_id': task.dag_id,
+            'task_id': task.task_id,
+            'run_id': run_id,
+            '_try_number': 0,
+            'unixname': getuser(),
+            'queue': task.queue,
+            'pool': task.pool,
+            'pool_slots': task.pool_slots,
+            'priority_weight': task.priority_weight_total,
+            'run_as_user': task.run_as_user,
+            'max_tries': task.retries,
+            'executor_config': task.executor_config,

Review comment:
       Good question - will test (or see if existing tests cover this)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#discussion_r780172962



##########
File path: airflow/utils/platform.py
##########
@@ -63,6 +65,7 @@ def get_airflow_git_version():
     return git_version
 
 
+@cache

Review comment:
       I added this as there is 0 need to call this 20k times when creating a dag -- it can't change at runtime.
   
   (Okay, it _technically_ could change the name at runtime, but that is very much an edge case, and not to mention that the value created in the scheduler doesn't matter, it will be re-overwritten when the Task executes, and that is the interesting value anyway




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#discussion_r779977566



##########
File path: airflow/models/dagrun.py
##########
@@ -804,18 +806,42 @@ def verify_integrity(self, session: Session = NEW_SESSION):
                 ti.state = State.NONE
             session.merge(ti)
 
-        # check for missing tasks
-        for task in dag.task_dict.values():
-            if task.start_date > self.execution_date and not self.is_backfill:
-                continue
+        def task_filter(task: "BaseOperator"):
+            return task.task_id not in task_ids and (
+                self.is_backfill or task.start_date <= self.execution_date
+            )
+
+        created_counts: Dict[str, int] = Counter()

Review comment:
       But `Counter` would be more useful if you change the below to something like
   
   ```python
   # create_ti_mapping is no longer needed.
   
   def create_ti(task: "BaseOperator") -> TI:
       ti = TI(task, run_id=self.run_id)
       task_instance_mutation_hook(ti)
       return ti
   
   if hook_is_noop:
       session.bulk_insert_mappings(TI, (TI.insert_mapping(self.run_id, t) for t in tasks))
   else:
       session.bulk_save_objects(create_ti(t) for t in tasks)
   
   created_counts.update(t.task_type for t in tasks)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#issuecomment-1007642827


   Static checks failed with 
   
   ```
   docker: error during connect: Post http://%2Fvar%2Frun%2Fdocker.sock/v1.24/containers/create?platform=linux%2Famd64: EOF.
   See 'docker run --help'.
   
   ERROR: The previous step completed with error. Please take a look at output above 
   ```
   
   I ran `pre-commit run -a` and only mypy failed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 15-40%

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#issuecomment-1006704359


   /cc @pingzh @KevinYang21 @potiuk 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] pingzh commented on a change in pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
pingzh commented on a change in pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#discussion_r780449646



##########
File path: airflow/models/taskinstance.py
##########
@@ -477,6 +477,25 @@ def __init__(
         # can be changed when calling 'run'
         self.test_mode = False
 
+    @staticmethod
+    def insert_mapping(run_id: str, task: "BaseOperator") -> dict:
+        """:meta private:"""
+        return {
+            'dag_id': task.dag_id,
+            'task_id': task.task_id,
+            'run_id': run_id,
+            '_try_number': 0,
+            'unixname': getuser(),
+            'queue': task.queue,
+            'pool': task.pool,
+            'pool_slots': task.pool_slots,
+            'priority_weight': task.priority_weight_total,
+            'run_as_user': task.run_as_user,
+            'max_tries': task.retries,
+            'executor_config': task.executor_config,

Review comment:
       nice, thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#discussion_r780134034



##########
File path: airflow/models/dagrun.py
##########
@@ -804,18 +806,42 @@ def verify_integrity(self, session: Session = NEW_SESSION):
                 ti.state = State.NONE
             session.merge(ti)
 
-        # check for missing tasks
-        for task in dag.task_dict.values():
-            if task.start_date > self.execution_date and not self.is_backfill:
-                continue
+        def task_filter(task: "BaseOperator"):
+            return task.task_id not in task_ids and (
+                self.is_backfill or task.start_date <= self.execution_date
+            )
+
+        created_counts: Dict[str, int] = Counter()

Review comment:
       If Counter is slower (or comparable) I’d just use defaultdict. Counter is cool, but generally does not produce better code, from my experience, and this is no exception.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#discussion_r780134034



##########
File path: airflow/models/dagrun.py
##########
@@ -804,18 +806,42 @@ def verify_integrity(self, session: Session = NEW_SESSION):
                 ti.state = State.NONE
             session.merge(ti)
 
-        # check for missing tasks
-        for task in dag.task_dict.values():
-            if task.start_date > self.execution_date and not self.is_backfill:
-                continue
+        def task_filter(task: "BaseOperator"):
+            return task.task_id not in task_ids and (
+                self.is_backfill or task.start_date <= self.execution_date
+            )
+
+        created_counts: Dict[str, int] = Counter()

Review comment:
       If Counter is slower (or comparable) I’d just use defaultdict. Counter is cool, but generally does not produce better code, from my experience, and this is no exception.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #20722: Speed up creation of DagRun for large DAGs (5k+ tasks) by 25-140%

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#discussion_r783382420



##########
File path: airflow/configuration.py
##########
@@ -493,6 +493,32 @@ def getimport(self, section, key, **kwargs):
                 f'Current value: "{full_qualified_path}".'
             )
 
+    def getjson(self, section, key, fallback=_UNSET, **kwargs) -> Union[dict, list, str, int, float, None]:
+        """
+        Return a config value parsed from a JSON string.
+
+        ``fallback`` is *not* JSON parsed but used verbatim when no config value is given.
+        """
+        # get always returns the fallback value as a string, so for this if
+        # someone gives us an object we want to keep that
+        default = _UNSET
+        if fallback is not _UNSET:
+            default = fallback
+            fallback = _UNSET
+
+        try:
+            data = self.get(section=section, key=key, fallback=fallback, **kwargs)
+        except (NoSectionError, NoOptionError):
+            return default
+
+        if len(data) == 0:

Review comment:
       @andriisoldatenko -> I also prefer explicit len check in such cases. Firs of all you'd have to write `if not data`. Negation in condition requires one more mental hoop to understand what it does and secondly len expresses the intention much more explcitely (if not data is also true when data is None). 
   
   Shorter does not always mean more readable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org