You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/10/14 15:16:49 UTC

[GitHub] [airflow] pavansharma36 opened a new pull request, #27060: 27010 : Add new files to parsing queue on every loop of dag processsing

pavansharma36 opened a new pull request, #27060:
URL: https://github.com/apache/airflow/pull/27060

   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: 27010
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   Once file paths are refreshed on dag_dir_list interval. If there are any new files added, add them to parsing queue at start so those can be parsed immediately instead of waiting of existing parsing queue to finish
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #27060: 27010 : Add new files to parsing queue on every loop of dag processsing

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #27060:
URL: https://github.com/apache/airflow/pull/27060#discussion_r1018141028


##########
tests/dag_processing/test_manager.py:
##########
@@ -300,9 +301,11 @@ def test_file_paths_in_queue_sorted_alphabetically(
         )
 
         manager.set_file_paths(dag_files)
-        assert manager._file_path_queue == []
+        assert manager._file_path_queue == collections.deque([])

Review Comment:
   Don’t need the `[]`, just `collections.deque()` would work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #27060: 27010 : Add new files to parsing queue on every loop of dag processsing

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #27060:
URL: https://github.com/apache/airflow/pull/27060#issuecomment-1309005177

   I'll look at the code in a moment, but how is this different from setting the existing dag dir refresh interval config to 0?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #27060: 27010 : Add new files to parsing queue on every loop of dag processsing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #27060:
URL: https://github.com/apache/airflow/pull/27060#discussion_r996806805


##########
airflow/dag_processing/manager.py:
##########
@@ -760,6 +762,10 @@ def _refresh_dag_dir(self):
 
             DagCode.remove_deleted_code(dag_filelocs)
 
+            return True
+        else:
+            return False

Review Comment:
   ```suggestion
               return True
           return False
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #27060: 27010 : Add new files to parsing queue on every loop of dag processsing

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #27060:
URL: https://github.com/apache/airflow/pull/27060#issuecomment-1309045599

   Got it thanks! (I was on my phone and it was difficult to read the code fully there)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #27060: 27010 : Add new files to parsing queue on every loop of dag processsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #27060:
URL: https://github.com/apache/airflow/pull/27060#issuecomment-1288206307

   I saw quite a few users complaining recently about similar behaviour, so it would be great to review and merge it before 2.4.3. 
   
   Deque should be fine when it comes to performance (at least all the comparision and micro-benchmarking I saw confirm that). But I think we should review this one carefully also taking into account the processing order the user can choose. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #27060: 27010 : Add new files to parsing queue on every loop of dag processsing

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #27060:
URL: https://github.com/apache/airflow/pull/27060#discussion_r996253234


##########
airflow/dag_processing/manager.py:
##########
@@ -1041,6 +1047,18 @@ def start_new_processes(self):
             self._processors[file_path] = processor
             self.waitables[processor.waitable_handle] = processor
 
+
+    def add_new_file_path_to_queue(self):
+        for file_path in self.file_paths:
+            if file_path not in self._file_stats:
+                # We found new file after refreshing dir. add to parsing queue at start
+                self.log.info('Adding new file %s to parsing queue', file_path)
+                self._file_stats[file_path] = DagFileStat(
+                    num_dags=0, import_errors=0, last_finish_time=None, last_duration=None, run_count=0
+                )
+                self._file_path_queue.insert(0, file_path)

Review Comment:
   List `insert` is `O(n)`. 
   If this change is desired, we should use `collections.deque` for `_file_path_queue`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on pull request #27060: 27010 : Add new files to parsing queue on every loop of dag processsing

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on PR #27060:
URL: https://github.com/apache/airflow/pull/27060#issuecomment-1279143198

   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better 🚀.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #27060: 27010 : Add new files to parsing queue on every loop of dag processsing

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #27060:
URL: https://github.com/apache/airflow/pull/27060#issuecomment-1296473821

   conflicts need resolving and some benchmarks would be coo. Also @ashb for comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pavansharma36 commented on a diff in pull request #27060: 27010 : Add new files to parsing queue on every loop of dag processsing

Posted by GitBox <gi...@apache.org>.
pavansharma36 commented on code in PR #27060:
URL: https://github.com/apache/airflow/pull/27060#discussion_r996308567


##########
airflow/dag_processing/manager.py:
##########
@@ -1041,6 +1047,18 @@ def start_new_processes(self):
             self._processors[file_path] = processor
             self.waitables[processor.waitable_handle] = processor
 
+
+    def add_new_file_path_to_queue(self):
+        for file_path in self.file_paths:
+            if file_path not in self._file_stats:
+                # We found new file after refreshing dir. add to parsing queue at start
+                self.log.info('Adding new file %s to parsing queue', file_path)
+                self._file_stats[file_path] = DagFileStat(
+                    num_dags=0, import_errors=0, last_finish_time=None, last_duration=None, run_count=0
+                )
+                self._file_path_queue.insert(0, file_path)

Review Comment:
   @ephraimbuddy  insert with 0 index is already used in function _add_callback_to_queue
   should I change type to deque it will affect multiple places?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #27060: 27010 : Add new files to parsing queue on every loop of dag processsing

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #27060:
URL: https://github.com/apache/airflow/pull/27060#discussion_r1017499028


##########
airflow/dag_processing/manager.py:
##########
@@ -932,7 +937,7 @@ def set_file_paths(self, new_file_paths):
         :return: None
         """
         self._file_paths = new_file_paths
-        self._file_path_queue = [x for x in self._file_path_queue if x in new_file_paths]
+        self._file_path_queue = collections.deque([x for x in self._file_path_queue if x in new_file_paths])

Review Comment:
   ```suggestion
           self._file_path_queue = collections.deque(x for x in self._file_path_queue if x in new_file_paths)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #27060: 27010 : Add new files to parsing queue on every loop of dag processsing

Posted by GitBox <gi...@apache.org>.
potiuk merged PR #27060:
URL: https://github.com/apache/airflow/pull/27060


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pavansharma36 commented on pull request #27060: 27010 : Add new files to parsing queue on every loop of dag processsing

Posted by GitBox <gi...@apache.org>.
pavansharma36 commented on PR #27060:
URL: https://github.com/apache/airflow/pull/27060#issuecomment-1309036169

   @ashb 
   
   Existing implementation adds new file to parsing queue only if parsing queue is empty.
   Consider we set refresh interval to 5 seconds and have 10k existing dag files which takes around 5 minutes to parse, Even though file paths are refreshed every 5 seconds new files are not added to parsing queue until all 10k files are parsed and queue becomes empty
   
   This change to make sure if file_path is refreshed on list_interval config, new files are added to parsing queue without waiting for existing file parsing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org