You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/03/27 02:52:52 UTC

[GitHub] [airflow] kaxil opened a new pull request #15046: Add different modes to sort dag files for parsing

kaxil opened a new pull request #15046:
URL: https://github.com/apache/airflow/pull/15046


   This commit adds the feature to allow users to set one of the following modes, the
    scheduler will list and sort the dag files to decide the parsing order.:
   
   - `modified_time`: Sort by modified time of the files. This is useful on large scale to parse the recently modified DAGs first.
   - `random_seeded_by_host`: Sort randomly across multiple Schedulers but with same order on the same host. This is useful when running with Scheduler in HA mode where each scheduler can parse different DAG files.
   - `alphabetical`: Sort by filename
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #15046: Add different modes to sort dag files for parsing

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #15046:
URL: https://github.com/apache/airflow/pull/15046


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15046: Add different modes to sort dag files for parsing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15046:
URL: https://github.com/apache/airflow/pull/15046#discussion_r603177833



##########
File path: airflow/configuration.py
##########
@@ -253,6 +253,18 @@ def _validate_config_dependencies(self):
                     + ", ".join(start_method_options)
                 )
 
+        if self.has_option("scheduler", "file_parsing_sort_mode"):
+            list_mode = self.get("scheduler", "file_parsing_sort_mode")
+            file_parser_modes = {"modified_time", "random_seeded_by_host", "alphabetical"}
+
+            if list_mode not in file_parser_modes:
+                raise AirflowConfigException(
+                    "`[scheduler] file_parsing_sort_mode` should not be "
+                    + list_mode

Review comment:
       ```suggestion
                       + repr(list_mode)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15046: Add different modes to sort dag files for parsing

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15046:
URL: https://github.com/apache/airflow/pull/15046#discussion_r602707700



##########
File path: airflow/utils/dag_processing.py
##########
@@ -1017,25 +1019,58 @@ def prepare_file_path_queue(self):
         # processed recently, wait until the next batch
         file_paths_in_progress = self._processors.keys()
         now = timezone.utcnow()
+
+        # Sort the file paths by the parsing order mode
+        list_mode = conf.get("scheduler", "file_parsing_sort_mode", fallback="modified_time")
+
+        if list_mode not in FILE_PARSER_MODES:

Review comment:
       Good point




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15046: Add different modes to sort dag files for parsing

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15046:
URL: https://github.com/apache/airflow/pull/15046#discussion_r603536217



##########
File path: airflow/utils/dag_processing.py
##########
@@ -1017,25 +1019,50 @@ def prepare_file_path_queue(self):
         # processed recently, wait until the next batch
         file_paths_in_progress = self._processors.keys()
         now = timezone.utcnow()
+
+        # Sort the file paths by the parsing order mode
+        list_mode = conf.get("scheduler", "file_parsing_sort_mode")
+
+        files_with_mtime = {}
+        file_paths = []
+        is_mtime_mode = list_mode == "modified_time"
+
         file_paths_recently_processed = []
         for file_path in self._file_paths:
+
+            if is_mtime_mode:
+                files_with_mtime[file_path] = os.path.getmtime(file_path)
+            else:
+                file_paths.append(file_path)
+
+            # Find file paths that were recently processed
             last_finish_time = self.get_last_finish_time(file_path)
             if (
                 last_finish_time is not None
                 and (now - last_finish_time).total_seconds() < self._file_process_interval
             ):
                 file_paths_recently_processed.append(file_path)
 
+        # Sort file paths via last modified time
+        if is_mtime_mode:
+            file_paths = sorted(files_with_mtime, key=files_with_mtime.get, reverse=True)
+        elif list_mode == "alphabetical":
+            file_paths = sorted(file_paths)
+        elif list_mode == "random_seeded_by_host":
+            # Shuffle the list seeded by hostname so multiple schedulers can work on different
+            # set of files. Since we set the seed, the sort order will remain same per host
+            random.Random(get_hostname()).shuffle(file_paths)
+
         files_paths_at_run_limit = [
             file_path for file_path, stat in self._file_stats.items() if stat.run_count == self._max_runs
         ]
 
-        files_paths_to_queue = list(
-            set(self._file_paths)
-            - set(file_paths_in_progress)
-            - set(file_paths_recently_processed)
-            - set(files_paths_at_run_limit)
-        )
+        files_paths_to_queue = [
+            file_path
+            for file_path in file_paths
+            if file_path
+            not in set(file_paths_in_progress).union(file_paths_recently_processed, files_paths_at_run_limit)
+        ]

Review comment:
       Updated in 4abb94edf




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15046: Add different modes to sort dag files for parsing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15046:
URL: https://github.com/apache/airflow/pull/15046#discussion_r603544147



##########
File path: airflow/utils/dag_processing.py
##########
@@ -1017,26 +1019,54 @@ def prepare_file_path_queue(self):
         # processed recently, wait until the next batch
         file_paths_in_progress = self._processors.keys()
         now = timezone.utcnow()
+
+        # Sort the file paths by the parsing order mode
+        list_mode = conf.get("scheduler", "file_parsing_sort_mode")
+
+        files_with_mtime = {}
+        file_paths = []
+        is_mtime_mode = list_mode == "modified_time"
+
         file_paths_recently_processed = []
         for file_path in self._file_paths:
+
+            if is_mtime_mode:
+                files_with_mtime[file_path] = os.path.getmtime(file_path)
+            else:
+                file_paths.append(file_path)
+
+            # Find file paths that were recently processed
             last_finish_time = self.get_last_finish_time(file_path)
             if (
                 last_finish_time is not None
                 and (now - last_finish_time).total_seconds() < self._file_process_interval
             ):
                 file_paths_recently_processed.append(file_path)
 
+        # Sort file paths via last modified time
+        if is_mtime_mode:
+            file_paths = sorted(files_with_mtime, key=files_with_mtime.get, reverse=True)
+        elif list_mode == "alphabetical":
+            file_paths = sorted(file_paths)
+        elif list_mode == "random_seeded_by_host":
+            # Shuffle the list seeded by hostname so multiple schedulers can work on different
+            # set of files. Since we set the seed, the sort order will remain same per host
+            random.Random(get_hostname()).shuffle(file_paths)
+
         files_paths_at_run_limit = [
             file_path for file_path, stat in self._file_stats.items() if stat.run_count == self._max_runs
         ]
 
-        files_paths_to_queue = list(
-            set(self._file_paths)
-            - set(file_paths_in_progress)
-            - set(file_paths_recently_processed)
-            - set(files_paths_at_run_limit)
+        # Do not convert the following list to set as set does not preserve the order
+        # and we need to maintain the order of file_paths for `[scheduler] file_parsing_sort_mode`
+        file_paths_to_exclude = set(file_paths_in_progress).union(
+            file_paths_recently_processed, files_paths_at_run_limit
         )
 
+        files_paths_to_queue = [

Review comment:
       ```suggestion
           file_paths_to_exclude = set(file_paths_in_progress).union(
               file_paths_recently_processed, files_paths_at_run_limit
           )
   
           # Do not convert the following list to set as set does not preserve the order
           # and we need to maintain the order of file_paths for `[scheduler] file_parsing_sort_mode`
           files_paths_to_queue = [
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15046: Add different modes to sort dag files for parsing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15046:
URL: https://github.com/apache/airflow/pull/15046#discussion_r603532950



##########
File path: airflow/utils/dag_processing.py
##########
@@ -1017,25 +1019,50 @@ def prepare_file_path_queue(self):
         # processed recently, wait until the next batch
         file_paths_in_progress = self._processors.keys()
         now = timezone.utcnow()
+
+        # Sort the file paths by the parsing order mode
+        list_mode = conf.get("scheduler", "file_parsing_sort_mode")
+
+        files_with_mtime = {}
+        file_paths = []
+        is_mtime_mode = list_mode == "modified_time"
+
         file_paths_recently_processed = []
         for file_path in self._file_paths:
+
+            if is_mtime_mode:
+                files_with_mtime[file_path] = os.path.getmtime(file_path)
+            else:
+                file_paths.append(file_path)
+
+            # Find file paths that were recently processed
             last_finish_time = self.get_last_finish_time(file_path)
             if (
                 last_finish_time is not None
                 and (now - last_finish_time).total_seconds() < self._file_process_interval
             ):
                 file_paths_recently_processed.append(file_path)
 
+        # Sort file paths via last modified time
+        if is_mtime_mode:
+            file_paths = sorted(files_with_mtime, key=files_with_mtime.get, reverse=True)
+        elif list_mode == "alphabetical":
+            file_paths = sorted(file_paths)
+        elif list_mode == "random_seeded_by_host":
+            # Shuffle the list seeded by hostname so multiple schedulers can work on different
+            # set of files. Since we set the seed, the sort order will remain same per host
+            random.Random(get_hostname()).shuffle(file_paths)
+
         files_paths_at_run_limit = [
             file_path for file_path, stat in self._file_stats.items() if stat.run_count == self._max_runs
         ]
 
-        files_paths_to_queue = list(
-            set(self._file_paths)
-            - set(file_paths_in_progress)
-            - set(file_paths_recently_processed)
-            - set(files_paths_at_run_limit)
-        )
+        files_paths_to_queue = [
+            file_path
+            for file_path in file_paths
+            if file_path
+            not in set(file_paths_in_progress).union(file_paths_recently_processed, files_paths_at_run_limit)
+        ]

Review comment:
       OH YEAH :D




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15046: Add different modes to sort dag files for parsing

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15046:
URL: https://github.com/apache/airflow/pull/15046#discussion_r602710231



##########
File path: airflow/utils/dag_processing.py
##########
@@ -1017,25 +1019,58 @@ def prepare_file_path_queue(self):
         # processed recently, wait until the next batch
         file_paths_in_progress = self._processors.keys()
         now = timezone.utcnow()
+
+        # Sort the file paths by the parsing order mode
+        list_mode = conf.get("scheduler", "file_parsing_sort_mode", fallback="modified_time")
+
+        if list_mode not in FILE_PARSER_MODES:

Review comment:
       Done in https://github.com/apache/airflow/pull/15046/commits/ade62230ac3dc2f3984772fcc57d23c6f20cc79d




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15046: Add different modes to sort dag files for parsing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15046:
URL: https://github.com/apache/airflow/pull/15046#discussion_r603177507



##########
File path: airflow/config_templates/config.yml
##########
@@ -1834,6 +1834,22 @@
       type: string
       example: ~
       default: "2"
+    - name: file_parsing_sort_mode
+      description: |
+        One of ``modified_time``, ``random_seeded_by_host`` and ``alphabetical``.
+        The scheduler will list and sort the dag files to decide the parsing order.
+
+        * ``modified_time``: Sort by modified time of the files. This is useful on large scale to parse the
+          recently modified DAGs first.
+        * ``random_seeded_by_host``: Sort randomly across multiple Schedulers but with same order on the
+          same host. This is useful when running with Scheduler in HA mode where each scheduler can
+          parse different DAG files.
+        * ``alphabetical``: Sort by filename
+
+      version_added: 2.0.2

Review comment:
       Shouldn't this be 2.1? It feels like a new feature, not a bug fix to me




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15046: Add different modes to sort dag files for parsing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15046:
URL: https://github.com/apache/airflow/pull/15046#discussion_r603178006



##########
File path: airflow/configuration.py
##########
@@ -253,6 +253,18 @@ def _validate_config_dependencies(self):
                     + ", ".join(start_method_options)
                 )
 
+        if self.has_option("scheduler", "file_parsing_sort_mode"):
+            list_mode = self.get("scheduler", "file_parsing_sort_mode")
+            file_parser_modes = {"modified_time", "random_seeded_by_host", "alphabetical"}
+
+            if list_mode not in file_parser_modes:
+                raise AirflowConfigException(
+                    "`[scheduler] file_parsing_sort_mode` should not be "
+                    + list_mode

Review comment:
       Use an f-string too maybe for this whole lote?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15046: Add different modes to sort dag files for parsing

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15046:
URL: https://github.com/apache/airflow/pull/15046#discussion_r603359429



##########
File path: airflow/configuration.py
##########
@@ -253,6 +253,18 @@ def _validate_config_dependencies(self):
                     + ", ".join(start_method_options)
                 )
 
+        if self.has_option("scheduler", "file_parsing_sort_mode"):
+            list_mode = self.get("scheduler", "file_parsing_sort_mode")
+            file_parser_modes = {"modified_time", "random_seeded_by_host", "alphabetical"}
+
+            if list_mode not in file_parser_modes:
+                raise AirflowConfigException(
+                    "`[scheduler] file_parsing_sort_mode` should not be "
+                    + list_mode

Review comment:
       fixed in https://github.com/apache/airflow/pull/15046/commits/e394febf64baffbeee34bc0383c042262659b424

##########
File path: airflow/config_templates/config.yml
##########
@@ -1834,6 +1834,22 @@
       type: string
       example: ~
       default: "2"
+    - name: file_parsing_sort_mode
+      description: |
+        One of ``modified_time``, ``random_seeded_by_host`` and ``alphabetical``.
+        The scheduler will list and sort the dag files to decide the parsing order.
+
+        * ``modified_time``: Sort by modified time of the files. This is useful on large scale to parse the
+          recently modified DAGs first.
+        * ``random_seeded_by_host``: Sort randomly across multiple Schedulers but with same order on the
+          same host. This is useful when running with Scheduler in HA mode where each scheduler can
+          parse different DAG files.
+        * ``alphabetical``: Sort by filename
+
+      version_added: 2.0.2

Review comment:
       fixed in https://github.com/apache/airflow/pull/15046/commits/e394febf64baffbeee34bc0383c042262659b424




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15046: Add different modes to sort dag files for parsing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15046:
URL: https://github.com/apache/airflow/pull/15046#discussion_r603544147



##########
File path: airflow/utils/dag_processing.py
##########
@@ -1017,26 +1019,54 @@ def prepare_file_path_queue(self):
         # processed recently, wait until the next batch
         file_paths_in_progress = self._processors.keys()
         now = timezone.utcnow()
+
+        # Sort the file paths by the parsing order mode
+        list_mode = conf.get("scheduler", "file_parsing_sort_mode")
+
+        files_with_mtime = {}
+        file_paths = []
+        is_mtime_mode = list_mode == "modified_time"
+
         file_paths_recently_processed = []
         for file_path in self._file_paths:
+
+            if is_mtime_mode:
+                files_with_mtime[file_path] = os.path.getmtime(file_path)
+            else:
+                file_paths.append(file_path)
+
+            # Find file paths that were recently processed
             last_finish_time = self.get_last_finish_time(file_path)
             if (
                 last_finish_time is not None
                 and (now - last_finish_time).total_seconds() < self._file_process_interval
             ):
                 file_paths_recently_processed.append(file_path)
 
+        # Sort file paths via last modified time
+        if is_mtime_mode:
+            file_paths = sorted(files_with_mtime, key=files_with_mtime.get, reverse=True)
+        elif list_mode == "alphabetical":
+            file_paths = sorted(file_paths)
+        elif list_mode == "random_seeded_by_host":
+            # Shuffle the list seeded by hostname so multiple schedulers can work on different
+            # set of files. Since we set the seed, the sort order will remain same per host
+            random.Random(get_hostname()).shuffle(file_paths)
+
         files_paths_at_run_limit = [
             file_path for file_path, stat in self._file_stats.items() if stat.run_count == self._max_runs
         ]
 
-        files_paths_to_queue = list(
-            set(self._file_paths)
-            - set(file_paths_in_progress)
-            - set(file_paths_recently_processed)
-            - set(files_paths_at_run_limit)
+        # Do not convert the following list to set as set does not preserve the order
+        # and we need to maintain the order of file_paths for `[scheduler] file_parsing_sort_mode`
+        file_paths_to_exclude = set(file_paths_in_progress).union(
+            file_paths_recently_processed, files_paths_at_run_limit
         )
 
+        files_paths_to_queue = [

Review comment:
       ```suggestion
   
           file_paths_to_exclude = set(file_paths_in_progress).union(
               file_paths_recently_processed, files_paths_at_run_limit
           )
   
           # Do not convert the following list to set as set does not preserve the order
           # and we need to maintain the order of file_paths for `[scheduler] file_parsing_sort_mode`
           files_paths_to_queue = [
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15046: Add different modes to sort dag files for parsing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15046:
URL: https://github.com/apache/airflow/pull/15046#discussion_r603540241



##########
File path: airflow/utils/dag_processing.py
##########
@@ -1017,25 +1019,50 @@ def prepare_file_path_queue(self):
         # processed recently, wait until the next batch
         file_paths_in_progress = self._processors.keys()
         now = timezone.utcnow()
+
+        # Sort the file paths by the parsing order mode
+        list_mode = conf.get("scheduler", "file_parsing_sort_mode")
+
+        files_with_mtime = {}
+        file_paths = []
+        is_mtime_mode = list_mode == "modified_time"
+
         file_paths_recently_processed = []
         for file_path in self._file_paths:
+
+            if is_mtime_mode:
+                files_with_mtime[file_path] = os.path.getmtime(file_path)
+            else:
+                file_paths.append(file_path)
+
+            # Find file paths that were recently processed
             last_finish_time = self.get_last_finish_time(file_path)
             if (
                 last_finish_time is not None
                 and (now - last_finish_time).total_seconds() < self._file_process_interval
             ):
                 file_paths_recently_processed.append(file_path)
 
+        # Sort file paths via last modified time
+        if is_mtime_mode:
+            file_paths = sorted(files_with_mtime, key=files_with_mtime.get, reverse=True)
+        elif list_mode == "alphabetical":
+            file_paths = sorted(file_paths)
+        elif list_mode == "random_seeded_by_host":
+            # Shuffle the list seeded by hostname so multiple schedulers can work on different
+            # set of files. Since we set the seed, the sort order will remain same per host
+            random.Random(get_hostname()).shuffle(file_paths)
+
         files_paths_at_run_limit = [
             file_path for file_path, stat in self._file_stats.items() if stat.run_count == self._max_runs
         ]
 
-        files_paths_to_queue = list(
-            set(self._file_paths)
-            - set(file_paths_in_progress)
-            - set(file_paths_recently_processed)
-            - set(files_paths_at_run_limit)
-        )
+        files_paths_to_queue = [
+            file_path
+            for file_path in file_paths
+            if file_path
+            not in set(file_paths_in_progress).union(file_paths_recently_processed, files_paths_at_run_limit)
+        ]

Review comment:
       ```
   In [25]: %timeit [ file_path for file_path in files if file_path not in set(s1).union(s2, s3) ]                                                                                                                                                      
   225 ms ± 20.7 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
   
   In [26]: %timeit excl = set(s1).union(s2, s3); [ file_path for file_path in files if file_path not in excl ]                                                                                                                                         
   1.44 ms ± 31.8 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15046: Add different modes to sort dag files for parsing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15046:
URL: https://github.com/apache/airflow/pull/15046#discussion_r602700368



##########
File path: airflow/utils/dag_processing.py
##########
@@ -1017,25 +1019,58 @@ def prepare_file_path_queue(self):
         # processed recently, wait until the next batch
         file_paths_in_progress = self._processors.keys()
         now = timezone.utcnow()
+
+        # Sort the file paths by the parsing order mode
+        list_mode = conf.get("scheduler", "file_parsing_sort_mode", fallback="modified_time")
+
+        if list_mode not in FILE_PARSER_MODES:

Review comment:
       This should be in configuration.py's validate method




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15046: Add different modes to sort dag files for parsing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15046:
URL: https://github.com/apache/airflow/pull/15046#discussion_r603179478



##########
File path: airflow/utils/dag_processing.py
##########
@@ -1017,25 +1019,50 @@ def prepare_file_path_queue(self):
         # processed recently, wait until the next batch
         file_paths_in_progress = self._processors.keys()
         now = timezone.utcnow()
+
+        # Sort the file paths by the parsing order mode
+        list_mode = conf.get("scheduler", "file_parsing_sort_mode")
+
+        files_with_mtime = {}
+        file_paths = []
+        is_mtime_mode = list_mode == "modified_time"
+
         file_paths_recently_processed = []
         for file_path in self._file_paths:
+
+            if is_mtime_mode:
+                files_with_mtime[file_path] = os.path.getmtime(file_path)
+            else:
+                file_paths.append(file_path)
+
+            # Find file paths that were recently processed
             last_finish_time = self.get_last_finish_time(file_path)
             if (
                 last_finish_time is not None
                 and (now - last_finish_time).total_seconds() < self._file_process_interval
             ):
                 file_paths_recently_processed.append(file_path)
 
+        # Sort file paths via last modified time
+        if is_mtime_mode:
+            file_paths = sorted(files_with_mtime, key=files_with_mtime.get, reverse=True)
+        elif list_mode == "alphabetical":
+            file_paths = sorted(file_paths)
+        elif list_mode == "random_seeded_by_host":
+            # Shuffle the list seeded by hostname so multiple schedulers can work on different
+            # set of files. Since we set the seed, the sort order will remain same per host
+            random.Random(get_hostname()).shuffle(file_paths)
+
         files_paths_at_run_limit = [
             file_path for file_path, stat in self._file_stats.items() if stat.run_count == self._max_runs
         ]
 
-        files_paths_to_queue = list(
-            set(self._file_paths)
-            - set(file_paths_in_progress)
-            - set(file_paths_recently_processed)
-            - set(files_paths_at_run_limit)
-        )
+        files_paths_to_queue = [
+            file_path
+            for file_path in file_paths
+            if file_path
+            not in set(file_paths_in_progress).union(file_paths_recently_processed, files_paths_at_run_limit)
+        ]

Review comment:
       Can you explain/describe why you changed this block?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15046: Add different modes to sort dag files for parsing

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15046:
URL: https://github.com/apache/airflow/pull/15046#discussion_r603359237



##########
File path: airflow/utils/dag_processing.py
##########
@@ -1017,25 +1019,50 @@ def prepare_file_path_queue(self):
         # processed recently, wait until the next batch
         file_paths_in_progress = self._processors.keys()
         now = timezone.utcnow()
+
+        # Sort the file paths by the parsing order mode
+        list_mode = conf.get("scheduler", "file_parsing_sort_mode")
+
+        files_with_mtime = {}
+        file_paths = []
+        is_mtime_mode = list_mode == "modified_time"
+
         file_paths_recently_processed = []
         for file_path in self._file_paths:
+
+            if is_mtime_mode:
+                files_with_mtime[file_path] = os.path.getmtime(file_path)
+            else:
+                file_paths.append(file_path)
+
+            # Find file paths that were recently processed
             last_finish_time = self.get_last_finish_time(file_path)
             if (
                 last_finish_time is not None
                 and (now - last_finish_time).total_seconds() < self._file_process_interval
             ):
                 file_paths_recently_processed.append(file_path)
 
+        # Sort file paths via last modified time
+        if is_mtime_mode:
+            file_paths = sorted(files_with_mtime, key=files_with_mtime.get, reverse=True)
+        elif list_mode == "alphabetical":
+            file_paths = sorted(file_paths)
+        elif list_mode == "random_seeded_by_host":
+            # Shuffle the list seeded by hostname so multiple schedulers can work on different
+            # set of files. Since we set the seed, the sort order will remain same per host
+            random.Random(get_hostname()).shuffle(file_paths)
+
         files_paths_at_run_limit = [
             file_path for file_path, stat in self._file_stats.items() if stat.run_count == self._max_runs
         ]
 
-        files_paths_to_queue = list(
-            set(self._file_paths)
-            - set(file_paths_in_progress)
-            - set(file_paths_recently_processed)
-            - set(files_paths_at_run_limit)
-        )
+        files_paths_to_queue = [
+            file_path
+            for file_path in file_paths
+            if file_path
+            not in set(file_paths_in_progress).union(file_paths_recently_processed, files_paths_at_run_limit)
+        ]

Review comment:
       Added a comment before the code block to explain.
   
   Basically since we were converting list to set just to exclude certain items, the order of `file_paths` was not maintained anymore, which basically would make the modes useless. Hence the current change, preservers insertion order because of list and remove items that are also in one of `file_paths_in_progress`, `file_paths_recently_processed` or `files_paths_at_run_limit`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #15046: Add different modes to sort dag files for parsing

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #15046:
URL: https://github.com/apache/airflow/pull/15046#discussion_r602791196



##########
File path: airflow/configuration.py
##########
@@ -253,6 +253,18 @@ def _validate_config_dependencies(self):
                     + ", ".join(start_method_options)
                 )
 
+        if self.has_option("scheduler", "file_parsing_sort_mode"):
+            list_mode = self.get("scheduler", "file_parsing_sort_mode", fallback="modified_time")

Review comment:
       ```suggestion
               list_mode = self.get("scheduler", "file_parsing_sort_mode")
   ```
   
   We get the fallback for free from `default_airflow.cfg`, no need to duplicate it here.

##########
File path: airflow/utils/dag_processing.py
##########
@@ -1017,25 +1019,50 @@ def prepare_file_path_queue(self):
         # processed recently, wait until the next batch
         file_paths_in_progress = self._processors.keys()
         now = timezone.utcnow()
+
+        # Sort the file paths by the parsing order mode
+        list_mode = conf.get("scheduler", "file_parsing_sort_mode", fallback="modified_time")

Review comment:
       ```suggestion
           list_mode = conf.get("scheduler", "file_parsing_sort_mode")
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15046: Add different modes to sort dag files for parsing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15046:
URL: https://github.com/apache/airflow/pull/15046#discussion_r603533341



##########
File path: airflow/utils/dag_processing.py
##########
@@ -1017,25 +1019,50 @@ def prepare_file_path_queue(self):
         # processed recently, wait until the next batch
         file_paths_in_progress = self._processors.keys()
         now = timezone.utcnow()
+
+        # Sort the file paths by the parsing order mode
+        list_mode = conf.get("scheduler", "file_parsing_sort_mode")
+
+        files_with_mtime = {}
+        file_paths = []
+        is_mtime_mode = list_mode == "modified_time"
+
         file_paths_recently_processed = []
         for file_path in self._file_paths:
+
+            if is_mtime_mode:
+                files_with_mtime[file_path] = os.path.getmtime(file_path)
+            else:
+                file_paths.append(file_path)
+
+            # Find file paths that were recently processed
             last_finish_time = self.get_last_finish_time(file_path)
             if (
                 last_finish_time is not None
                 and (now - last_finish_time).total_seconds() < self._file_process_interval
             ):
                 file_paths_recently_processed.append(file_path)
 
+        # Sort file paths via last modified time
+        if is_mtime_mode:
+            file_paths = sorted(files_with_mtime, key=files_with_mtime.get, reverse=True)
+        elif list_mode == "alphabetical":
+            file_paths = sorted(file_paths)
+        elif list_mode == "random_seeded_by_host":
+            # Shuffle the list seeded by hostname so multiple schedulers can work on different
+            # set of files. Since we set the seed, the sort order will remain same per host
+            random.Random(get_hostname()).shuffle(file_paths)
+
         files_paths_at_run_limit = [
             file_path for file_path, stat in self._file_stats.items() if stat.run_count == self._max_runs
         ]
 
-        files_paths_to_queue = list(
-            set(self._file_paths)
-            - set(file_paths_in_progress)
-            - set(file_paths_recently_processed)
-            - set(files_paths_at_run_limit)
-        )
+        files_paths_to_queue = [
+            file_path
+            for file_path in file_paths
+            if file_path
+            not in set(file_paths_in_progress).union(file_paths_recently_processed, files_paths_at_run_limit)
+        ]

Review comment:
       One thing: This may create the "removal set" once for each file in file_paths though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org