You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/03/09 23:16:27 UTC

[GitHub] [airflow] mik-laj opened a new pull request #7674: Simplify DagFileProcessor.process_file method

mik-laj opened a new pull request #7674: Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674
 
 
   **NOTE FOR REVIEWERS:** This PR contains several commits. Please review each commit in turn.
   
   This method is too long, so I did the following changes: 
   ```
   708872191 Filter active DAGs only once
   aa38e5afb Extract _prepare_simple_dags method
   67f6f388d Extract _schedule_task_instances method
   ```
   
   ---
   Issue link: WILL BE INSERTED BY [boring-cyborg](https://github.com/kaxil/boring-cyborg)
   
   Make sure to mark the boxes below before creating PR: [x]
   
   - [X] Description above provides context of the change
   - [X] Commit message/PR title starts with `[AIRFLOW-NNNN]`. AIRFLOW-NNNN = JIRA ID<sup>*</sup>
   - [X] Unit tests coverage for changes (not needed for documentation changes)
   - [X] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [X] Relevant documentation is updated including usage instructions.
   - [X] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   <sup>*</sup> For document-only changes commit message can start with `[AIRFLOW-XXXX]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io edited a comment on issue #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#issuecomment-597902794
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7674?src=pr&el=h1) Report
   > Merging [#7674](https://codecov.io/gh/apache/airflow/pull/7674?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/5be3b31bda4e9ebd8d14cefae27f422edcc332b7?src=pr&el=desc) will **decrease** coverage by `22.27%`.
   > The diff coverage is `75.28%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7674/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7674?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff             @@
   ##           master    #7674       +/-   ##
   ===========================================
   - Coverage   86.84%   64.56%   -22.28%     
   ===========================================
     Files         897      903        +6     
     Lines       42806    43727      +921     
   ===========================================
   - Hits        37173    28231     -8942     
   - Misses       5633    15496     +9863
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7674?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...ders/google/cloud/example\_dags/example\_dataflow.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL2V4YW1wbGVfZGFncy9leGFtcGxlX2RhdGFmbG93LnB5) | `0% <ø> (-100%)` | :arrow_down: |
   | [airflow/models/connection.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvY29ubmVjdGlvbi5weQ==) | `75.35% <ø> (-19.72%)` | :arrow_down: |
   | [...s/google/cloud/example\_dags/example\_datacatalog.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL2V4YW1wbGVfZGFncy9leGFtcGxlX2RhdGFjYXRhbG9nLnB5) | `0% <0%> (ø)` | |
   | [...providers/google/cloud/example\_dags/example\_dlp.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL2V4YW1wbGVfZGFncy9leGFtcGxlX2RscC5weQ==) | `0% <0%> (-100%)` | :arrow_down: |
   | [...s/google/cloud/example\_dags/example\_stackdriver.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL2V4YW1wbGVfZGFncy9leGFtcGxlX3N0YWNrZHJpdmVyLnB5) | `0% <0%> (-100%)` | :arrow_down: |
   | [airflow/api/common/experimental/pool.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9wb29sLnB5) | `25.53% <0%> (-74.47%)` | :arrow_down: |
   | [airflow/www/views.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvdmlld3MucHk=) | `25.63% <0%> (-50.5%)` | :arrow_down: |
   | [.../example\_dags/example\_spark\_kubernetes\_operator.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvY25jZi9rdWJlcm5ldGVzL2V4YW1wbGVfZGFncy9leGFtcGxlX3NwYXJrX2t1YmVybmV0ZXNfb3BlcmF0b3IucHk=) | `0% <0%> (ø)` | |
   | [...oviders/google/cloud/example\_dags/example\_tasks.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL2V4YW1wbGVfZGFncy9leGFtcGxlX3Rhc2tzLnB5) | `0% <0%> (-100%)` | :arrow_down: |
   | [...viders/cncf/kubernetes/sensors/spark\_kubernetes.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvY25jZi9rdWJlcm5ldGVzL3NlbnNvcnMvc3Bhcmtfa3ViZXJuZXRlcy5weQ==) | `100% <100%> (ø)` | |
   | ... and [518 more](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7674?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7674?src=pr&el=footer). Last update [5be3b31...4ea2043](https://codecov.io/gh/apache/airflow/pull/7674?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#discussion_r390259132
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -905,6 +898,28 @@ def process_file(
 
         return simple_dags, len(dagbag.import_errors)
 
+    @provide_session
+    def _prepare_simple_dags(self, dags: List[DAG], pickle_dags: bool, session) -> List[SimpleDag]:
+        """
+        Convert DAGS to  SimpleDags. If necessary, it also Pickle the DAGs
+
+        :param dags: List of DAGs
+        :param pickle_dags: whether serialize the DAGs found in the file and
+            save them to the db
+        :type pickle_dags: bool
+        :return: List of SimpleDag
+        :rtype: List[airflow.utils.dag_processing.SimpleDag]
+        """
+
+        simple_dags = []
+        # Pickle the DAGs (if necessary) and put them into a SimpleDag
+        for dag in dags:
+            pickle_id = None
+            if pickle_dags:
+                pickle_id = dag.pickle(session).id
 
 Review comment:
   I'd say no to list comprehension here as it would be quite large because of the `if pickle` condition/block.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#discussion_r391227897
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -845,11 +845,11 @@ def process_file(
 
         paused_dag_ids = DagModel.get_paused_dag_ids(dag_ids=dagbag.dag_ids)
 
-        active_dags = [dag for dag_id, dag in dagbag.dags.items() if dag_id not in paused_dag_ids]
+        unpaaused_dags = [dag for dag_id, dag in dagbag.dags.items() if dag_id not in paused_dag_ids]
 
-        simple_dags = self._prepare_simple_dags(active_dags, pickle_dags, session)
+        simple_dags = self._prepare_simple_dags(unpaaused_dags, pickle_dags, session)
 
 Review comment:
   ```suggestion
           simple_dags = self._prepare_simple_dags(unpaused_dags, pickle_dags, session)
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on issue #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#issuecomment-597174623
 
 
   Travis is green again.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#discussion_r390285717
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -747,21 +747,16 @@ def _process_dags(self, dags: List[DAG], session=None):
 
         return tis_out
 
-    def _find_dags_to_process(self, dags: List[DAG], paused_dag_ids: Set[str]) -> List[DAG]:
+    def _find_dags_to_process(self, dags: List[DAG]) -> List[DAG]:
         """
         Find the DAGs that are not paused to process.
 
         :param dags: specified DAGs
-        :param paused_dag_ids: paused DAG IDs
         :return: DAGs to process
         """
         if len(self.dag_ids) > 0:
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#discussion_r391221756
 
 

 ##########
 File path: tests/jobs/test_scheduler_job.py
 ##########
 @@ -1057,6 +1057,31 @@ def test_process_file_should_failure_callback(self):
             self.assertEqual("Callback fired", content)
             os.remove(callback_file.name)
 
+    def test_should_parse_only_active_dags(self):
 
 Review comment:
   Updated. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#discussion_r390267914
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -747,21 +747,16 @@ def _process_dags(self, dags: List[DAG], session=None):
 
         return tis_out
 
-    def _find_dags_to_process(self, dags: List[DAG], paused_dag_ids: Set[str]) -> List[DAG]:
+    def _find_dags_to_process(self, dags: List[DAG]) -> List[DAG]:
         """
         Find the DAGs that are not paused to process.
 
         :param dags: specified DAGs
-        :param paused_dag_ids: paused DAG IDs
         :return: DAGs to process
         """
         if len(self.dag_ids) > 0:
 
 Review comment:
   I prefer to limit additional changes if I make changes to the core. This makes the review more difficult.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io edited a comment on issue #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#issuecomment-597902794
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7674?src=pr&el=h1) Report
   > Merging [#7674](https://codecov.io/gh/apache/airflow/pull/7674?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/5be3b31bda4e9ebd8d14cefae27f422edcc332b7&el=desc) will **decrease** coverage by `0.62%`.
   > The diff coverage is `86.66%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7674/graphs/tree.svg?width=650&height=150&src=pr&token=WdLKlKHOAU)](https://codecov.io/gh/apache/airflow/pull/7674?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7674      +/-   ##
   ==========================================
   - Coverage   86.84%   86.21%   -0.63%     
   ==========================================
     Files         897      904       +7     
     Lines       42806    43740     +934     
   ==========================================
   + Hits        37173    37710     +537     
   - Misses       5633     6030     +397     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7674?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/jobs/scheduler\_job.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL3NjaGVkdWxlcl9qb2IucHk=) | `90.88% <86.66%> (+0.22%)` | :arrow_up: |
   | [...flow/providers/apache/cassandra/hooks/cassandra.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYXBhY2hlL2Nhc3NhbmRyYS9ob29rcy9jYXNzYW5kcmEucHk=) | `21.51% <0.00%> (-72.16%)` | :arrow_down: |
   | [...w/providers/apache/hive/operators/mysql\_to\_hive.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYXBhY2hlL2hpdmUvb3BlcmF0b3JzL215c3FsX3RvX2hpdmUucHk=) | `35.84% <0.00%> (-64.16%)` | :arrow_down: |
   | [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0.00%> (-55.56%)` | :arrow_down: |
   | [airflow/providers/postgres/operators/postgres.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvcG9zdGdyZXMvb3BlcmF0b3JzL3Bvc3RncmVzLnB5) | `50.00% <0.00%> (-50.00%)` | :arrow_down: |
   | [airflow/providers/redis/operators/redis\_publish.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvcmVkaXMvb3BlcmF0b3JzL3JlZGlzX3B1Ymxpc2gucHk=) | `50.00% <0.00%> (-50.00%)` | :arrow_down: |
   | [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0.00%> (-47.06%)` | :arrow_down: |
   | [airflow/providers/mongo/sensors/mongo.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvbW9uZ28vc2Vuc29ycy9tb25nby5weQ==) | `53.33% <0.00%> (-46.67%)` | :arrow_down: |
   | [airflow/kubernetes/pod\_launcher.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3BvZF9sYXVuY2hlci5weQ==) | `47.18% <0.00%> (-45.08%)` | :arrow_down: |
   | [airflow/providers/mysql/operators/mysql.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvbXlzcWwvb3BlcmF0b3JzL215c3FsLnB5) | `55.00% <0.00%> (-45.00%)` | :arrow_down: |
   | ... and [33 more](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7674?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7674?src=pr&el=footer). Last update [5be3b31...4ea2043](https://codecov.io/gh/apache/airflow/pull/7674?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#discussion_r390260237
 
 

 ##########
 File path: tests/jobs/test_scheduler_job.py
 ##########
 @@ -842,21 +842,12 @@ def test_find_dags_to_run_includes_subdags(self):
         print(self.dagbag.dag_folder)
         self.assertGreater(len(dag.subdags), 0)
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
-        dags = dag_file_processor._find_dags_to_process(self.dagbag.dags.values(), paused_dag_ids=())
+        dags = dag_file_processor._find_dags_to_process(self.dagbag.dags.values())
 
         self.assertIn(dag, dags)
         for subdag in dag.subdags:
             self.assertIn(subdag, dags)
 
-    def test_find_dags_to_run_skip_paused_dags(self):
-        dagbag = DagBag(include_examples=False)
-
-        dag = dagbag.get_dag('test_subdag_operator')
 
 Review comment:
   Although the interface has changed, we do want to check it still skips over paused dags -- do we have a test that covers that still?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#discussion_r391228009
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -845,11 +845,11 @@ def process_file(
 
         paused_dag_ids = DagModel.get_paused_dag_ids(dag_ids=dagbag.dag_ids)
 
-        active_dags = [dag for dag_id, dag in dagbag.dags.items() if dag_id not in paused_dag_ids]
+        unpaaused_dags = [dag for dag_id, dag in dagbag.dags.items() if dag_id not in paused_dag_ids]
 
-        simple_dags = self._prepare_simple_dags(active_dags, pickle_dags, session)
+        simple_dags = self._prepare_simple_dags(unpaaused_dags, pickle_dags, session)
 
-        dags = self._find_dags_to_process(active_dags)
+        dags = self._find_dags_to_process(unpaaused_dags)
 
 Review comment:
   ```suggestion
           dags = self._find_dags_to_process(unpaused_dags)
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#discussion_r390267914
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -747,21 +747,16 @@ def _process_dags(self, dags: List[DAG], session=None):
 
         return tis_out
 
-    def _find_dags_to_process(self, dags: List[DAG], paused_dag_ids: Set[str]) -> List[DAG]:
+    def _find_dags_to_process(self, dags: List[DAG]) -> List[DAG]:
         """
         Find the DAGs that are not paused to process.
 
         :param dags: specified DAGs
-        :param paused_dag_ids: paused DAG IDs
         :return: DAGs to process
         """
         if len(self.dag_ids) > 0:
 
 Review comment:
   I prefer to limit additional changes if I make changes to the core. This makes the review more difficult.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#discussion_r391266639
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -845,11 +845,11 @@ def process_file(
 
         paused_dag_ids = DagModel.get_paused_dag_ids(dag_ids=dagbag.dag_ids)
 
-        active_dags = [dag for dag_id, dag in dagbag.dags.items() if dag_id not in paused_dag_ids]
+        unpaaused_dags = [dag for dag_id, dag in dagbag.dags.items() if dag_id not in paused_dag_ids]
 
-        simple_dags = self._prepare_simple_dags(active_dags, pickle_dags, session)
+        simple_dags = self._prepare_simple_dags(unpaaused_dags, pickle_dags, session)
 
-        dags = self._find_dags_to_process(active_dags)
+        dags = self._find_dags_to_process(unpaaused_dags)
 
 Review comment:
   Fixed. Thanks.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj merged pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
mik-laj merged pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#discussion_r391159188
 
 

 ##########
 File path: tests/jobs/test_scheduler_job.py
 ##########
 @@ -1057,6 +1057,31 @@ def test_process_file_should_failure_callback(self):
             self.assertEqual("Callback fired", content)
             os.remove(callback_file.name)
 
+    def test_should_parse_only_active_dags(self):
 
 Review comment:
   :+1:  from me after this change is made

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
nuclearpinguin commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#discussion_r390246851
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -905,6 +898,28 @@ def process_file(
 
         return simple_dags, len(dagbag.import_errors)
 
+    @provide_session
+    def _prepare_simple_dags(self, dags: List[DAG], pickle_dags: bool, session) -> List[SimpleDag]:
+        """
+        Convert DAGS to  SimpleDags. If necessary, it also Pickle the DAGs
+
+        :param dags: List of DAGs
+        :param pickle_dags: whether serialize the DAGs found in the file and
+            save them to the db
+        :type pickle_dags: bool
+        :return: List of SimpleDag
+        :rtype: List[airflow.utils.dag_processing.SimpleDag]
+        """
+
+        simple_dags = []
+        # Pickle the DAGs (if necessary) and put them into a SimpleDag
+        for dag in dags:
+            pickle_id = None
+            if pickle_dags:
+                pickle_id = dag.pickle(session).id
 
 Review comment:
   ```suggestion
                   pickle_id = dag.pickle(session).id if pickle_dags else None
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#discussion_r391266670
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -845,11 +845,11 @@ def process_file(
 
         paused_dag_ids = DagModel.get_paused_dag_ids(dag_ids=dagbag.dag_ids)
 
-        active_dags = [dag for dag_id, dag in dagbag.dags.items() if dag_id not in paused_dag_ids]
+        unpaaused_dags = [dag for dag_id, dag in dagbag.dags.items() if dag_id not in paused_dag_ids]
 
-        simple_dags = self._prepare_simple_dags(active_dags, pickle_dags, session)
+        simple_dags = self._prepare_simple_dags(unpaaused_dags, pickle_dags, session)
 
 Review comment:
   Fixed. Thanks.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
nuclearpinguin commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#discussion_r390245689
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -747,21 +747,16 @@ def _process_dags(self, dags: List[DAG], session=None):
 
         return tis_out
 
-    def _find_dags_to_process(self, dags: List[DAG], paused_dag_ids: Set[str]) -> List[DAG]:
+    def _find_dags_to_process(self, dags: List[DAG]) -> List[DAG]:
         """
         Find the DAGs that are not paused to process.
 
         :param dags: specified DAGs
-        :param paused_dag_ids: paused DAG IDs
         :return: DAGs to process
         """
         if len(self.dag_ids) > 0:
 
 Review comment:
   ```suggestion
           if self.dag_ids:
   ```
   Should fix pylint error that will be found one day :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
nuclearpinguin commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#discussion_r390247701
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -905,6 +898,28 @@ def process_file(
 
         return simple_dags, len(dagbag.import_errors)
 
+    @provide_session
+    def _prepare_simple_dags(self, dags: List[DAG], pickle_dags: bool, session) -> List[SimpleDag]:
+        """
+        Convert DAGS to  SimpleDags. If necessary, it also Pickle the DAGs
+
+        :param dags: List of DAGs
+        :param pickle_dags: whether serialize the DAGs found in the file and
+            save them to the db
+        :type pickle_dags: bool
+        :return: List of SimpleDag
+        :rtype: List[airflow.utils.dag_processing.SimpleDag]
+        """
+
+        simple_dags = []
+        # Pickle the DAGs (if necessary) and put them into a SimpleDag
+        for dag in dags:
+            pickle_id = None
+            if pickle_dags:
+                pickle_id = dag.pickle(session).id
 
 Review comment:
   There's even a chance to use list comprehension but I'm not sure if this will be more readable. If yes, maybe generator expression? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
nuclearpinguin commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#discussion_r390246266
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -852,16 +847,16 @@ def process_file(
 
         paused_dag_ids = DagModel.get_paused_dag_ids(dag_ids=dagbag.dag_ids)
 
+        active_dags = [dag for dag_id, dag in dagbag.dags.items() if dag_id not in paused_dag_ids]
+
         # Pickle the DAGs (if necessary) and put them into a SimpleDag
-        for dag_id, dag in dagbag.dags.items():
-            # Only return DAGs that are not paused
-            if dag_id not in paused_dag_ids:
-                pickle_id = None
-                if pickle_dags:
-                    pickle_id = dag.pickle(session).id
-                simple_dags.append(SimpleDag(dag, pickle_id=pickle_id))
-
-        dags = self._find_dags_to_process(dagbag.dags.values(), paused_dag_ids)
+        for dag in active_dags:
+            pickle_id = None
+            if pickle_dags:
+                pickle_id = dag.pickle(session).id
 
 Review comment:
   ```suggestion
                   pickle_id = dag.pickle(session).id if pickle_dags else None
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#discussion_r390304791
 
 

 ##########
 File path: tests/jobs/test_scheduler_job.py
 ##########
 @@ -842,21 +842,12 @@ def test_find_dags_to_run_includes_subdags(self):
         print(self.dagbag.dag_folder)
         self.assertGreater(len(dag.subdags), 0)
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
-        dags = dag_file_processor._find_dags_to_process(self.dagbag.dags.values(), paused_dag_ids=())
+        dags = dag_file_processor._find_dags_to_process(self.dagbag.dags.values())
 
         self.assertIn(dag, dags)
         for subdag in dag.subdags:
             self.assertIn(subdag, dags)
 
-    def test_find_dags_to_run_skip_paused_dags(self):
-        dagbag = DagBag(include_examples=False)
-
-        dag = dagbag.get_dag('test_subdag_operator')
 
 Review comment:
   We need to remove it because we are now checking for active DAGs in the processs_file method. Here is commit: https://github.com/apache/airflow/pull/7674/commits/708872191af53a81cf74a9be3ea2309db0b8a748
   I added new tests:
   https://github.com/apache/airflow/pull/7674/commits/7232a990d7648052d6e18ddb6ba38342ec0705c7

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#discussion_r390285608
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -852,16 +847,16 @@ def process_file(
 
         paused_dag_ids = DagModel.get_paused_dag_ids(dag_ids=dagbag.dag_ids)
 
+        active_dags = [dag for dag_id, dag in dagbag.dags.items() if dag_id not in paused_dag_ids]
+
         # Pickle the DAGs (if necessary) and put them into a SimpleDag
-        for dag_id, dag in dagbag.dags.items():
-            # Only return DAGs that are not paused
-            if dag_id not in paused_dag_ids:
-                pickle_id = None
-                if pickle_dags:
-                    pickle_id = dag.pickle(session).id
-                simple_dags.append(SimpleDag(dag, pickle_id=pickle_id))
-
-        dags = self._find_dags_to_process(dagbag.dags.values(), paused_dag_ids)
+        for dag in active_dags:
+            pickle_id = None
+            if pickle_dags:
+                pickle_id = dag.pickle(session).id
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io commented on issue #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
codecov-io commented on issue #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#issuecomment-597902794
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7674?src=pr&el=h1) Report
   > Merging [#7674](https://codecov.io/gh/apache/airflow/pull/7674?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/5be3b31bda4e9ebd8d14cefae27f422edcc332b7&el=desc) will **decrease** coverage by `22.27%`.
   > The diff coverage is `75.28%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7674/graphs/tree.svg?width=650&height=150&src=pr&token=WdLKlKHOAU)](https://codecov.io/gh/apache/airflow/pull/7674?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff             @@
   ##           master    #7674       +/-   ##
   ===========================================
   - Coverage   86.84%   64.56%   -22.28%     
   ===========================================
     Files         897      903        +6     
     Lines       42806    43727      +921     
   ===========================================
   - Hits        37173    28231     -8942     
   - Misses       5633    15496     +9863     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7674?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/api/common/experimental/pool.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9wb29sLnB5) | `25.53% <0.00%> (-74.47%)` | :arrow_down: |
   | [airflow/models/connection.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvY29ubmVjdGlvbi5weQ==) | `75.35% <ø> (-19.72%)` | :arrow_down: |
   | [.../example\_dags/example\_spark\_kubernetes\_operator.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvY25jZi9rdWJlcm5ldGVzL2V4YW1wbGVfZGFncy9leGFtcGxlX3NwYXJrX2t1YmVybmV0ZXNfb3BlcmF0b3IucHk=) | `0.00% <0.00%> (ø)` | |
   | [...s/google/cloud/example\_dags/example\_datacatalog.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL2V4YW1wbGVfZGFncy9leGFtcGxlX2RhdGFjYXRhbG9nLnB5) | `0.00% <0.00%> (ø)` | |
   | [...ders/google/cloud/example\_dags/example\_dataflow.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL2V4YW1wbGVfZGFncy9leGFtcGxlX2RhdGFmbG93LnB5) | `0.00% <ø> (-100.00%)` | :arrow_down: |
   | [...providers/google/cloud/example\_dags/example\_dlp.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL2V4YW1wbGVfZGFncy9leGFtcGxlX2RscC5weQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...s/google/cloud/example\_dags/example\_stackdriver.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL2V4YW1wbGVfZGFncy9leGFtcGxlX3N0YWNrZHJpdmVyLnB5) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...oviders/google/cloud/example\_dags/example\_tasks.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL2V4YW1wbGVfZGFncy9leGFtcGxlX3Rhc2tzLnB5) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [airflow/www/views.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvdmlld3MucHk=) | `25.63% <0.00%> (-50.50%)` | :arrow_down: |
   | [airflow/jobs/scheduler\_job.py](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL3NjaGVkdWxlcl9qb2IucHk=) | `14.18% <6.97%> (-76.48%)` | :arrow_down: |
   | ... and [518 more](https://codecov.io/gh/apache/airflow/pull/7674/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7674?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7674?src=pr&el=footer). Last update [5be3b31...4ea2043](https://codecov.io/gh/apache/airflow/pull/7674?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on issue #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#issuecomment-597777026
 
 
   Thanks for doing them as fixup commits @mik-laj -- I think we (comitters) should get more in the habit of it, makes re-review easier

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#discussion_r391227796
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -845,11 +845,11 @@ def process_file(
 
         paused_dag_ids = DagModel.get_paused_dag_ids(dag_ids=dagbag.dag_ids)
 
-        active_dags = [dag for dag_id, dag in dagbag.dags.items() if dag_id not in paused_dag_ids]
+        unpaaused_dags = [dag for dag_id, dag in dagbag.dags.items() if dag_id not in paused_dag_ids]
 
 Review comment:
   ```suggestion
           unpaused_dags = [dag for dag_id, dag in dagbag.dags.items() if dag_id not in paused_dag_ids]
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#discussion_r390259132
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -905,6 +898,28 @@ def process_file(
 
         return simple_dags, len(dagbag.import_errors)
 
+    @provide_session
+    def _prepare_simple_dags(self, dags: List[DAG], pickle_dags: bool, session) -> List[SimpleDag]:
+        """
+        Convert DAGS to  SimpleDags. If necessary, it also Pickle the DAGs
+
+        :param dags: List of DAGs
+        :param pickle_dags: whether serialize the DAGs found in the file and
+            save them to the db
+        :type pickle_dags: bool
+        :return: List of SimpleDag
+        :rtype: List[airflow.utils.dag_processing.SimpleDag]
+        """
+
+        simple_dags = []
+        # Pickle the DAGs (if necessary) and put them into a SimpleDag
+        for dag in dags:
+            pickle_id = None
+            if pickle_dags:
+                pickle_id = dag.pickle(session).id
 
 Review comment:
   I'd say no to list comprehension here as it would be quite large because of the `if pickle` condition/block. (And large list comprehensions are generally more complex than it's worth for readability)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
nuclearpinguin commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#discussion_r390248231
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -853,6 +853,32 @@ def process_file(
 
         ti_keys_to_schedule = self._process_dags(dags, session)
 
+        self._schedule_task_instances(dagbag, ti_keys_to_schedule, session)
+
+        # Record import errors into the ORM
+        try:
+            self.update_import_errors(session, dagbag)
+        except Exception:  # pylint: disable=broad-except
+            self.log.exception("Error logging import errors!")
+
+        return simple_dags, len(dagbag.import_errors)
+
+    @provide_session
+    def _schedule_task_instances(
+        self,
+        dagbag: models.DagBag,
 
 Review comment:
   ```suggestion
           dagbag: DagBag,
   ```
   Can we use explicit type?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#discussion_r390268934
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -853,6 +853,32 @@ def process_file(
 
         ti_keys_to_schedule = self._process_dags(dags, session)
 
+        self._schedule_task_instances(dagbag, ti_keys_to_schedule, session)
+
+        # Record import errors into the ORM
+        try:
+            self.update_import_errors(session, dagbag)
+        except Exception:  # pylint: disable=broad-except
+            self.log.exception("Error logging import errors!")
+
+        return simple_dags, len(dagbag.import_errors)
+
+    @provide_session
+    def _schedule_task_instances(
+        self,
+        dagbag: models.DagBag,
 
 Review comment:
   In other places in this file we use this type of import. I would prefer to keep one convention. I prefer to limit additional changes if I make changes to the core. This makes the review more difficult.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#discussion_r391158278
 
 

 ##########
 File path: tests/jobs/test_scheduler_job.py
 ##########
 @@ -1057,6 +1057,31 @@ def test_process_file_should_failure_callback(self):
             self.assertEqual("Callback fired", content)
             os.remove(callback_file.name)
 
+    def test_should_parse_only_active_dags(self):
 
 Review comment:
   ```suggestion
       def test_should_parse_only_unpaused_dags(self):
   ```
   
   Active is another concept (the is_active column).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#discussion_r390269625
 
 

 ##########
 File path: tests/jobs/test_scheduler_job.py
 ##########
 @@ -842,21 +842,12 @@ def test_find_dags_to_run_includes_subdags(self):
         print(self.dagbag.dag_folder)
         self.assertGreater(len(dag.subdags), 0)
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
-        dags = dag_file_processor._find_dags_to_process(self.dagbag.dags.values(), paused_dag_ids=())
+        dags = dag_file_processor._find_dags_to_process(self.dagbag.dags.values())
 
         self.assertIn(dag, dags)
         for subdag in dag.subdags:
             self.assertIn(subdag, dags)
 
-    def test_find_dags_to_run_skip_paused_dags(self):
-        dagbag = DagBag(include_examples=False)
-
-        dag = dagbag.get_dag('test_subdag_operator')
 
 Review comment:
   I thought that @kaxil had already added such a test, but I will check it and add it if it is not there yet.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7674: [AIRFLOW-7022] Simplify DagFileProcessor.process_file method
URL: https://github.com/apache/airflow/pull/7674#discussion_r390274618
 
 

 ##########
 File path: tests/jobs/test_scheduler_job.py
 ##########
 @@ -842,21 +842,12 @@ def test_find_dags_to_run_includes_subdags(self):
         print(self.dagbag.dag_folder)
         self.assertGreater(len(dag.subdags), 0)
         dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
-        dags = dag_file_processor._find_dags_to_process(self.dagbag.dags.values(), paused_dag_ids=())
+        dags = dag_file_processor._find_dags_to_process(self.dagbag.dags.values())
 
         self.assertIn(dag, dags)
         for subdag in dag.subdags:
             self.assertIn(subdag, dags)
 
-    def test_find_dags_to_run_skip_paused_dags(self):
-        dagbag = DagBag(include_examples=False)
-
-        dag = dagbag.get_dag('test_subdag_operator')
 
 Review comment:
   We refactored a method: https://github.com/apache/airflow/pull/7587 and added a test to check the response.
   
   and we relied on the 2 existing test, check the comment here: https://github.com/apache/airflow/pull/7587#issuecomment-592527753
   
   This test was one of them, we don't need to delete this test, do we?
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services