You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/10 19:33:26 UTC

[GitHub] [airflow] pingzh opened a new pull request #21501: Support different timeout value for dag file parsing

pingzh opened a new pull request #21501:
URL: https://github.com/apache/airflow/pull/21501


   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   This adds a hook for infra users to dynamically control the dag file parsing timeout.
   
   It is useful when there are a few dag files, requiring long parsing time while others not
   You can control them separately instead of having one value for all dag files.
   
   If the return value is less than 0, it means no timeout during the dag parsing.
   
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21501:
URL: https://github.com/apache/airflow/pull/21501#discussion_r804638708



##########
File path: tests/models/test_dagbag.py
##########
@@ -206,6 +207,49 @@ def test_zip(self):
         assert dagbag.get_dag("test_zip_dag")
         assert sys.path == syspath_before  # sys.path doesn't change
 
+    @patch("airflow.models.dagbag.timeout")
+    @patch("airflow.models.dagbag.settings.get_dagbag_import_timeout")
+    def test_process_dag_file_without_timeout(self, mocked_get_dagbag_import_timeout, mocked_timeout):
+        """
+        Test dag file parsing without timeout
+        """
+        mocked_get_dagbag_import_timeout.return_value = 0
+
+        dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=False)
+        dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, 'test_default_views.py'))
+        mocked_timeout.assert_not_called()
+
+    @patch("airflow.models.dagbag.timeout")
+    @patch("airflow.models.dagbag.settings.get_dagbag_import_timeout")
+    def test_process_dag_file_with_non_default_timeout(
+        self, mocked_get_dagbag_import_timeout, mocked_timeout
+    ):
+        """
+        Test customized dag file parsing timeout
+        """
+        timeout_value = 100
+        mocked_get_dagbag_import_timeout.return_value = timeout_value
+
+        # ensure the test value is not equal to the default value
+        assert timeout_value != settings.conf.getfloat('core', 'DAGBAG_IMPORT_TIMEOUT')
+
+        dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=False)
+        dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, 'test_default_views.py'))
+
+        mocked_timeout.assert_called_once_with(timeout_value, error_message=mock.ANY)
+
+    @patch("airflow.models.dagbag.settings.get_dagbag_import_timeout")
+    def test_check_value_type_from_get_dagbag_import_timeout(self, mocked_get_dagbag_import_timeout):
+        """
+        Test correctness of value from get_dagbag_import_timeout
+        """
+        mocked_get_dagbag_import_timeout.return_value = '1'
+
+        dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=False)
+        with pytest.raises(TypeError) as e_info:
+            dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, 'test_default_views.py'))
+        assert str(e_info.value) == 'Value (1) from get_dagbag_import_timeout must be int or float'

Review comment:
       ```suggestion
           with pytest.raises(TypeError, match=r"Value \(1\) from get_dagbag_import_timeout must be int or float"):
               dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, 'test_default_views.py'))
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] pingzh commented on a change in pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
pingzh commented on a change in pull request #21501:
URL: https://github.com/apache/airflow/pull/21501#discussion_r807133702



##########
File path: docs/apache-airflow/faq.rst
##########
@@ -119,6 +119,30 @@ How do I trigger tasks based on another task's failure?
 
 You can achieve this with :ref:`concepts:trigger-rules`.
 
+How to control DAG file parsing timeout for different DAG files?
+---------------------------------------------------------------------------------
+(only valid for Airflow >= 2.3.0)
+
+You can add a function ``get_dagbag_import_timeout`` in the ``airflow_local_settings.py``. This function gets
+called right before a DAG file is parsed. You can return different timeout value based on the DAG file.
+When the return value is less than or equal to 0, it means no timeout during the DAG parsing.
+
+.. code-block:: python
+   :caption: airflow_local_settings.py
+   :name: airflow_local_settings.py
+
+    def get_dagbag_import_timeout(dag_file_path: str) -> Union[int, float]:
+        """
+        This setting allows to dynamically control the DAG file parsing timeout.
+
+        It is useful when there are a few DAG files requiring longer parsing times, while others do not.
+        You can control them separately instead of having one value for all DAG files.
+
+        If the return value is less than or equal to 0, it means no timeout during the DAG parsing.
+        """

Review comment:
       good idea




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21501:
URL: https://github.com/apache/airflow/pull/21501#discussion_r804638708



##########
File path: tests/models/test_dagbag.py
##########
@@ -206,6 +207,49 @@ def test_zip(self):
         assert dagbag.get_dag("test_zip_dag")
         assert sys.path == syspath_before  # sys.path doesn't change
 
+    @patch("airflow.models.dagbag.timeout")
+    @patch("airflow.models.dagbag.settings.get_dagbag_import_timeout")
+    def test_process_dag_file_without_timeout(self, mocked_get_dagbag_import_timeout, mocked_timeout):
+        """
+        Test dag file parsing without timeout
+        """
+        mocked_get_dagbag_import_timeout.return_value = 0
+
+        dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=False)
+        dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, 'test_default_views.py'))
+        mocked_timeout.assert_not_called()
+
+    @patch("airflow.models.dagbag.timeout")
+    @patch("airflow.models.dagbag.settings.get_dagbag_import_timeout")
+    def test_process_dag_file_with_non_default_timeout(
+        self, mocked_get_dagbag_import_timeout, mocked_timeout
+    ):
+        """
+        Test customized dag file parsing timeout
+        """
+        timeout_value = 100
+        mocked_get_dagbag_import_timeout.return_value = timeout_value
+
+        # ensure the test value is not equal to the default value
+        assert timeout_value != settings.conf.getfloat('core', 'DAGBAG_IMPORT_TIMEOUT')
+
+        dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=False)
+        dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, 'test_default_views.py'))
+
+        mocked_timeout.assert_called_once_with(timeout_value, error_message=mock.ANY)
+
+    @patch("airflow.models.dagbag.settings.get_dagbag_import_timeout")
+    def test_check_value_type_from_get_dagbag_import_timeout(self, mocked_get_dagbag_import_timeout):
+        """
+        Test correctness of value from get_dagbag_import_timeout
+        """
+        mocked_get_dagbag_import_timeout.return_value = '1'
+
+        dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=False)
+        with pytest.raises(TypeError) as e_info:
+            dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, 'test_default_views.py'))
+        assert str(e_info.value) == 'Value (1) from get_dagbag_import_timeout must be int or float'

Review comment:
       ```suggestion
           with pytest.raises(TypeError, matches=r"Value \(1\) from get_dagbag_import_timeout must be int or float"):
               dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, 'test_default_views.py'))
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21501:
URL: https://github.com/apache/airflow/pull/21501#discussion_r804638350



##########
File path: tests/models/test_dagbag.py
##########
@@ -206,6 +207,49 @@ def test_zip(self):
         assert dagbag.get_dag("test_zip_dag")
         assert sys.path == syspath_before  # sys.path doesn't change
 
+    @patch("airflow.models.dagbag.timeout")
+    @patch("airflow.models.dagbag.settings.get_dagbag_import_timeout")
+    def test_process_dag_file_without_timeout(self, mocked_get_dagbag_import_timeout, mocked_timeout):
+        """
+        Test dag file parsing without timeout
+        """
+        mocked_get_dagbag_import_timeout.return_value = 0
+
+        dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=False)
+        dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, 'test_default_views.py'))
+        mocked_timeout.assert_not_called()
+
+    @patch("airflow.models.dagbag.timeout")
+    @patch("airflow.models.dagbag.settings.get_dagbag_import_timeout")
+    def test_process_dag_file_with_non_default_timeout(
+        self, mocked_get_dagbag_import_timeout, mocked_timeout
+    ):
+        """
+        Test customized dag file parsing timeout
+        """
+        timeout_value = 100
+        mocked_get_dagbag_import_timeout.return_value = timeout_value
+
+        # ensure the test value is not equal to the default value
+        assert timeout_value != settings.conf.getfloat('core', 'DAGBAG_IMPORT_TIMEOUT')
+
+        dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=False)
+        dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, 'test_default_views.py'))
+
+        mocked_timeout.assert_called_once_with(timeout_value, error_message=mock.ANY)
+
+    @patch("airflow.models.dagbag.settings.get_dagbag_import_timeout")
+    def test_check_value_type_from_get_dagbag_import_timeout(self, mocked_get_dagbag_import_timeout):
+        """
+        Test correctness of value from get_dagbag_import_timeout
+        """
+        mocked_get_dagbag_import_timeout.return_value = '1'

Review comment:
       This is testing the error condition though :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21501:
URL: https://github.com/apache/airflow/pull/21501#discussion_r807351409



##########
File path: docs/apache-airflow/faq.rst
##########
@@ -119,6 +119,34 @@ How do I trigger tasks based on another task's failure?
 
 You can achieve this with :ref:`concepts:trigger-rules`.
 
+How to control DAG file parsing timeout for different DAG files?
+----------------------------------------------------------------
+(only valid for Airflow >= 2.3.0)

Review comment:
       ```suggestion
   
    .. versionadded:: 2.3
    
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] pingzh commented on pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
pingzh commented on pull request #21501:
URL: https://github.com/apache/airflow/pull/21501#issuecomment-1035877332


   > Oh, and we should add a small section in the docs somewhere too I'd think.
   
   @jedcunningham can you share the docs that I can put it? thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21501:
URL: https://github.com/apache/airflow/pull/21501#discussion_r804637903



##########
File path: airflow/models/dagbag.py
##########
@@ -310,13 +310,7 @@ def _load_modules_from_file(self, filepath, safe_mode):
         if mod_name in sys.modules:
             del sys.modules[mod_name]
 
-        timeout_msg = (
-            f"DagBag import timeout for {filepath} after {self.DAGBAG_IMPORT_TIMEOUT}s.\n"
-            "Please take a look at these docs to improve your DAG import time:\n"
-            f"* {get_docs_url('best-practices.html#top-level-python-code')}\n"
-            f"* {get_docs_url('best-practices.html#reducing-dag-complexity')}"
-        )
-        with timeout(self.DAGBAG_IMPORT_TIMEOUT, error_message=timeout_msg):

Review comment:
       This constant isn't used anymore either is it? We should remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #21501:
URL: https://github.com/apache/airflow/pull/21501#discussion_r804712162



##########
File path: tests/models/test_dagbag.py
##########
@@ -206,6 +207,49 @@ def test_zip(self):
         assert dagbag.get_dag("test_zip_dag")
         assert sys.path == syspath_before  # sys.path doesn't change
 
+    @patch("airflow.models.dagbag.timeout")
+    @patch("airflow.models.dagbag.settings.get_dagbag_import_timeout")
+    def test_process_dag_file_without_timeout(self, mocked_get_dagbag_import_timeout, mocked_timeout):
+        """
+        Test dag file parsing without timeout
+        """
+        mocked_get_dagbag_import_timeout.return_value = 0
+
+        dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=False)
+        dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, 'test_default_views.py'))
+        mocked_timeout.assert_not_called()
+
+    @patch("airflow.models.dagbag.timeout")
+    @patch("airflow.models.dagbag.settings.get_dagbag_import_timeout")
+    def test_process_dag_file_with_non_default_timeout(
+        self, mocked_get_dagbag_import_timeout, mocked_timeout
+    ):
+        """
+        Test customized dag file parsing timeout
+        """
+        timeout_value = 100
+        mocked_get_dagbag_import_timeout.return_value = timeout_value
+
+        # ensure the test value is not equal to the default value
+        assert timeout_value != settings.conf.getfloat('core', 'DAGBAG_IMPORT_TIMEOUT')
+
+        dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=False)
+        dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, 'test_default_views.py'))
+
+        mocked_timeout.assert_called_once_with(timeout_value, error_message=mock.ANY)
+
+    @patch("airflow.models.dagbag.settings.get_dagbag_import_timeout")
+    def test_check_value_type_from_get_dagbag_import_timeout(self, mocked_get_dagbag_import_timeout):
+        """
+        Test correctness of value from get_dagbag_import_timeout
+        """
+        mocked_get_dagbag_import_timeout.return_value = '1'

Review comment:
       🤦‍♂️ Oops




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #21501:
URL: https://github.com/apache/airflow/pull/21501#discussion_r804345052



##########
File path: airflow/settings.py
##########
@@ -209,6 +209,18 @@ def get_airflow_context_vars(context):
     return {}
 
 
+def get_dagbag_import_timeout(dag_file_path: str) -> float:
+    """
+    This setting allows to dynamically control the dag file parsing timeout.

Review comment:
       ```suggestion
       This setting allows for dynamic control of the dag file parsing timeout based on the DAG file path.
   ```

##########
File path: docs/apache-airflow/faq.rst
##########
@@ -119,6 +119,30 @@ How do I trigger tasks based on another task's failure?
 
 You can achieve this with :ref:`concepts:trigger-rules`.
 
+How to control DAG file parsing timeout for different DAG files?
+---------------------------------------------------------------------------------
+(only valid for Airflow >= 2.3.0)
+
+You can add a function ``get_dagbag_import_timeout`` in the ``airflow_local_settings.py``. This function gets
+called right before a DAG file is parsed. You can return different timeout value based on the DAG file.

Review comment:
       ```suggestion
   called right before a DAG file is parsed. You can return a different timeout value based on the DAG file.
   ```

##########
File path: docs/apache-airflow/faq.rst
##########
@@ -119,6 +119,30 @@ How do I trigger tasks based on another task's failure?
 
 You can achieve this with :ref:`concepts:trigger-rules`.
 
+How to control DAG file parsing timeout for different DAG files?
+---------------------------------------------------------------------------------
+(only valid for Airflow >= 2.3.0)

Review comment:
       ```suggestion
   How to control DAG file parsing timeout for different DAG files?
   ----------------------------------------------------------------
   
   (only valid for Airflow >= 2.3.0)
   ```
   
   nit

##########
File path: airflow/settings.py
##########
@@ -209,6 +209,18 @@ def get_airflow_context_vars(context):
     return {}
 
 
+def get_dagbag_import_timeout(dag_file_path: str) -> float:
+    """
+    This setting allows to dynamically control the dag file parsing timeout.

Review comment:
       Was this missed?

##########
File path: docs/apache-airflow/faq.rst
##########
@@ -119,6 +119,30 @@ How do I trigger tasks based on another task's failure?
 
 You can achieve this with :ref:`concepts:trigger-rules`.
 
+How to control DAG file parsing timeout for different DAG files?
+---------------------------------------------------------------------------------
+(only valid for Airflow >= 2.3.0)
+
+You can add a function ``get_dagbag_import_timeout`` in the ``airflow_local_settings.py``. This function gets
+called right before a DAG file is parsed. You can return different timeout value based on the DAG file.
+When the return value is less than or equal to 0, it means no timeout during the DAG parsing.
+
+.. code-block:: python
+   :caption: airflow_local_settings.py
+   :name: airflow_local_settings.py
+
+    def get_dagbag_import_timeout(dag_file_path: str) -> Union[int, float]:
+        """
+        This setting allows to dynamically control the DAG file parsing timeout.
+
+        It is useful when there are a few DAG files requiring longer parsing times, while others do not.
+        You can control them separately instead of having one value for all DAG files.
+
+        If the return value is less than or equal to 0, it means no timeout during the DAG parsing.
+        """

Review comment:
       Maybe have a simple example instead?
   ```suggestion
           if "slow" in dag_file_path:
               return 90
           return conf.getfloat('core', 'DAGBAG_IMPORT_TIMEOUT')
   ```

##########
File path: docs/apache-airflow/faq.rst
##########
@@ -119,6 +119,30 @@ How do I trigger tasks based on another task's failure?
 
 You can achieve this with :ref:`concepts:trigger-rules`.
 
+How to control DAG file parsing timeout for different DAG files?
+---------------------------------------------------------------------------------
+(only valid for Airflow >= 2.3.0)
+
+You can add a function ``get_dagbag_import_timeout`` in the ``airflow_local_settings.py``. This function gets
+called right before a DAG file is parsed. You can return different timeout value based on the DAG file.
+When the return value is less than or equal to 0, it means no timeout during the DAG parsing.
+
+.. code-block:: python
+   :caption: airflow_local_settings.py
+   :name: airflow_local_settings.py
+
+    def get_dagbag_import_timeout(dag_file_path: str) -> Union[int, float]:
+        """
+        This setting allows to dynamically control the DAG file parsing timeout.
+
+        It is useful when there are a few DAG files requiring longer parsing times, while others do not.
+        You can control them separately instead of having one value for all DAG files.
+
+        If the return value is less than or equal to 0, it means no timeout during the DAG parsing.
+        """
+
+
+

Review comment:
       ```suggestion
   ```
   
   nit: one less empty line to be consistent

##########
File path: docs/apache-airflow/faq.rst
##########
@@ -119,6 +119,30 @@ How do I trigger tasks based on another task's failure?
 
 You can achieve this with :ref:`concepts:trigger-rules`.
 
+How to control DAG file parsing timeout for different DAG files?
+---------------------------------------------------------------------------------
+(only valid for Airflow >= 2.3.0)
+
+You can add a function ``get_dagbag_import_timeout`` in the ``airflow_local_settings.py``. This function gets

Review comment:
       ```suggestion
   You can add a ``get_dagbag_import_timeout`` function in your ``airflow_local_settings.py`` which gets
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #21501:
URL: https://github.com/apache/airflow/pull/21501#discussion_r807347602



##########
File path: docs/apache-airflow/faq.rst
##########
@@ -119,6 +119,34 @@ How do I trigger tasks based on another task's failure?
 
 You can achieve this with :ref:`concepts:trigger-rules`.
 
+How to control DAG file parsing timeout for different DAG files?
+----------------------------------------------------------------
+(only valid for Airflow >= 2.3.0)

Review comment:
       ```suggestion
   
   (only valid for Airflow >= 2.3.0)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #21501:
URL: https://github.com/apache/airflow/pull/21501#discussion_r804348257



##########
File path: tests/models/test_dagbag.py
##########
@@ -206,6 +207,49 @@ def test_zip(self):
         assert dagbag.get_dag("test_zip_dag")
         assert sys.path == syspath_before  # sys.path doesn't change
 
+    @patch("airflow.models.dagbag.timeout")
+    @patch("airflow.models.dagbag.settings.get_dagbag_import_timeout")
+    def test_process_dag_file_without_timeout(self, mocked_get_dagbag_import_timeout, mocked_timeout):
+        """
+        Test dag file parsing without timeout
+        """
+        mocked_get_dagbag_import_timeout.return_value = 0

Review comment:
       We should also check with a negative, no?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] pingzh commented on pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
pingzh commented on pull request #21501:
URL: https://github.com/apache/airflow/pull/21501#issuecomment-1039823521


   @jedcunningham could you please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] pingzh commented on a change in pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
pingzh commented on a change in pull request #21501:
URL: https://github.com/apache/airflow/pull/21501#discussion_r807133702



##########
File path: docs/apache-airflow/faq.rst
##########
@@ -119,6 +119,30 @@ How do I trigger tasks based on another task's failure?
 
 You can achieve this with :ref:`concepts:trigger-rules`.
 
+How to control DAG file parsing timeout for different DAG files?
+---------------------------------------------------------------------------------
+(only valid for Airflow >= 2.3.0)
+
+You can add a function ``get_dagbag_import_timeout`` in the ``airflow_local_settings.py``. This function gets
+called right before a DAG file is parsed. You can return different timeout value based on the DAG file.
+When the return value is less than or equal to 0, it means no timeout during the DAG parsing.
+
+.. code-block:: python
+   :caption: airflow_local_settings.py
+   :name: airflow_local_settings.py
+
+    def get_dagbag_import_timeout(dag_file_path: str) -> Union[int, float]:
+        """
+        This setting allows to dynamically control the DAG file parsing timeout.
+
+        It is useful when there are a few DAG files requiring longer parsing times, while others do not.
+        You can control them separately instead of having one value for all DAG files.
+
+        If the return value is less than or equal to 0, it means no timeout during the DAG parsing.
+        """

Review comment:
       good idea




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #21501:
URL: https://github.com/apache/airflow/pull/21501#issuecomment-1035404183


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #21501:
URL: https://github.com/apache/airflow/pull/21501#discussion_r804344322



##########
File path: airflow/settings.py
##########
@@ -209,6 +209,18 @@ def get_airflow_context_vars(context):
     return {}
 
 
+def get_dagbag_import_timeout(dag_file_path: str) -> float:
+    """
+    This setting allows to dynamically control the dag file parsing timeout.
+
+    It is useful when there are a few dag files, requiring long parsing time while others not

Review comment:
       ```suggestion
       It is useful when there are a few DAG files requiring longer parsing times, while others do not.
   ```

##########
File path: tests/models/test_dagbag.py
##########
@@ -206,6 +207,49 @@ def test_zip(self):
         assert dagbag.get_dag("test_zip_dag")
         assert sys.path == syspath_before  # sys.path doesn't change
 
+    @patch("airflow.models.dagbag.timeout")
+    @patch("airflow.models.dagbag.settings.get_dagbag_import_timeout")
+    def test_process_dag_file_without_timeout(self, mocked_get_dagbag_import_timeout, mocked_timeout):
+        """
+        Test dag file parsing without timeout
+        """
+        mocked_get_dagbag_import_timeout.return_value = 0

Review comment:
       We also check with a negative, no?

##########
File path: tests/models/test_dagbag.py
##########
@@ -206,6 +207,49 @@ def test_zip(self):
         assert dagbag.get_dag("test_zip_dag")
         assert sys.path == syspath_before  # sys.path doesn't change
 
+    @patch("airflow.models.dagbag.timeout")
+    @patch("airflow.models.dagbag.settings.get_dagbag_import_timeout")
+    def test_process_dag_file_without_timeout(self, mocked_get_dagbag_import_timeout, mocked_timeout):
+        """
+        Test dag file parsing without timeout
+        """
+        mocked_get_dagbag_import_timeout.return_value = 0
+
+        dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=False)
+        dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, 'test_default_views.py'))
+        mocked_timeout.assert_not_called()
+
+    @patch("airflow.models.dagbag.timeout")
+    @patch("airflow.models.dagbag.settings.get_dagbag_import_timeout")
+    def test_process_dag_file_with_non_default_timeout(
+        self, mocked_get_dagbag_import_timeout, mocked_timeout
+    ):
+        """
+        Test customized dag file parsing timeout
+        """
+        timeout_value = 100
+        mocked_get_dagbag_import_timeout.return_value = timeout_value
+
+        # ensure the test value is not equal to the default value
+        assert timeout_value != settings.conf.getfloat('core', 'DAGBAG_IMPORT_TIMEOUT')
+
+        dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=False)
+        dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, 'test_default_views.py'))
+
+        mocked_timeout.assert_called_once_with(timeout_value, error_message=mock.ANY)
+
+    @patch("airflow.models.dagbag.settings.get_dagbag_import_timeout")
+    def test_check_value_type_from_get_dagbag_import_timeout(self, mocked_get_dagbag_import_timeout):
+        """
+        Test correctness of value from get_dagbag_import_timeout
+        """
+        mocked_get_dagbag_import_timeout.return_value = '1'

Review comment:
       ```suggestion
           mocked_get_dagbag_import_timeout.return_value = 0.1
   ```
   Faster, and the right type?

##########
File path: airflow/settings.py
##########
@@ -209,6 +209,18 @@ def get_airflow_context_vars(context):
     return {}
 
 
+def get_dagbag_import_timeout(dag_file_path: str) -> float:
+    """
+    This setting allows to dynamically control the dag file parsing timeout.
+
+    It is useful when there are a few dag files, requiring long parsing time while others not
+    You can control them separately instead of having one value for all dag files.

Review comment:
       ```suggestion
       You can control them separately instead of having one value for all DAG files.
   ```

##########
File path: airflow/settings.py
##########
@@ -209,6 +209,18 @@ def get_airflow_context_vars(context):
     return {}
 
 
+def get_dagbag_import_timeout(dag_file_path: str) -> float:
+    """
+    This setting allows to dynamically control the dag file parsing timeout.
+
+    It is useful when there are a few dag files, requiring long parsing time while others not
+    You can control them separately instead of having one value for all dag files.
+
+    If the return value is less than 0, it means no timeout during the dag parsing.

Review comment:
       ```suggestion
       If the return value is less than or equal to 0, it means no timeout while parsing.
   ```

##########
File path: airflow/settings.py
##########
@@ -209,6 +209,18 @@ def get_airflow_context_vars(context):
     return {}
 
 
+def get_dagbag_import_timeout(dag_file_path: str) -> float:

Review comment:
       ```suggestion
   def get_dagbag_import_timeout(dag_file_path: str) -> Union[int, float]:
   ```
   

##########
File path: airflow/settings.py
##########
@@ -209,6 +209,18 @@ def get_airflow_context_vars(context):
     return {}
 
 
+def get_dagbag_import_timeout(dag_file_path: str) -> float:
+    """
+    This setting allows to dynamically control the dag file parsing timeout.

Review comment:
       ```suggestion
       This setting allows for dynamically control of the dag file parsing timeout based on the DAG file path.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21501:
URL: https://github.com/apache/airflow/pull/21501#discussion_r807364492



##########
File path: docs/apache-airflow/faq.rst
##########
@@ -119,6 +119,34 @@ How do I trigger tasks based on another task's failure?
 
 You can achieve this with :ref:`concepts:trigger-rules`.
 
+How to control DAG file parsing timeout for different DAG files?
+----------------------------------------------------------------
+(only valid for Airflow >= 2.3.0)

Review comment:
       There's only one other use -- your call.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham merged pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
jedcunningham merged pull request #21501:
URL: https://github.com/apache/airflow/pull/21501


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] pingzh commented on pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
pingzh commented on pull request #21501:
URL: https://github.com/apache/airflow/pull/21501#issuecomment-1039823521


   @jedcunningham could you please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #21501:
URL: https://github.com/apache/airflow/pull/21501#discussion_r806484800



##########
File path: docs/apache-airflow/faq.rst
##########
@@ -119,6 +119,30 @@ How do I trigger tasks based on another task's failure?
 
 You can achieve this with :ref:`concepts:trigger-rules`.
 
+How to control DAG file parsing timeout for different DAG files?
+---------------------------------------------------------------------------------
+(only valid for Airflow >= 2.3.0)
+
+You can add a function ``get_dagbag_import_timeout`` in the ``airflow_local_settings.py``. This function gets
+called right before a DAG file is parsed. You can return different timeout value based on the DAG file.

Review comment:
       ```suggestion
   called right before a DAG file is parsed. You can return a different timeout value based on the DAG file.
   ```

##########
File path: docs/apache-airflow/faq.rst
##########
@@ -119,6 +119,30 @@ How do I trigger tasks based on another task's failure?
 
 You can achieve this with :ref:`concepts:trigger-rules`.
 
+How to control DAG file parsing timeout for different DAG files?
+---------------------------------------------------------------------------------
+(only valid for Airflow >= 2.3.0)

Review comment:
       ```suggestion
   How to control DAG file parsing timeout for different DAG files?
   ----------------------------------------------------------------
   
   (only valid for Airflow >= 2.3.0)
   ```
   
   nit

##########
File path: airflow/settings.py
##########
@@ -209,6 +209,18 @@ def get_airflow_context_vars(context):
     return {}
 
 
+def get_dagbag_import_timeout(dag_file_path: str) -> float:
+    """
+    This setting allows to dynamically control the dag file parsing timeout.

Review comment:
       Was this missed?

##########
File path: docs/apache-airflow/faq.rst
##########
@@ -119,6 +119,30 @@ How do I trigger tasks based on another task's failure?
 
 You can achieve this with :ref:`concepts:trigger-rules`.
 
+How to control DAG file parsing timeout for different DAG files?
+---------------------------------------------------------------------------------
+(only valid for Airflow >= 2.3.0)
+
+You can add a function ``get_dagbag_import_timeout`` in the ``airflow_local_settings.py``. This function gets
+called right before a DAG file is parsed. You can return different timeout value based on the DAG file.
+When the return value is less than or equal to 0, it means no timeout during the DAG parsing.
+
+.. code-block:: python
+   :caption: airflow_local_settings.py
+   :name: airflow_local_settings.py
+
+    def get_dagbag_import_timeout(dag_file_path: str) -> Union[int, float]:
+        """
+        This setting allows to dynamically control the DAG file parsing timeout.
+
+        It is useful when there are a few DAG files requiring longer parsing times, while others do not.
+        You can control them separately instead of having one value for all DAG files.
+
+        If the return value is less than or equal to 0, it means no timeout during the DAG parsing.
+        """

Review comment:
       Maybe have a simple example instead?
   ```suggestion
           if "slow" in dag_file_path:
               return 90
           return conf.getfloat('core', 'DAGBAG_IMPORT_TIMEOUT')
   ```

##########
File path: docs/apache-airflow/faq.rst
##########
@@ -119,6 +119,30 @@ How do I trigger tasks based on another task's failure?
 
 You can achieve this with :ref:`concepts:trigger-rules`.
 
+How to control DAG file parsing timeout for different DAG files?
+---------------------------------------------------------------------------------
+(only valid for Airflow >= 2.3.0)
+
+You can add a function ``get_dagbag_import_timeout`` in the ``airflow_local_settings.py``. This function gets
+called right before a DAG file is parsed. You can return different timeout value based on the DAG file.
+When the return value is less than or equal to 0, it means no timeout during the DAG parsing.
+
+.. code-block:: python
+   :caption: airflow_local_settings.py
+   :name: airflow_local_settings.py
+
+    def get_dagbag_import_timeout(dag_file_path: str) -> Union[int, float]:
+        """
+        This setting allows to dynamically control the DAG file parsing timeout.
+
+        It is useful when there are a few DAG files requiring longer parsing times, while others do not.
+        You can control them separately instead of having one value for all DAG files.
+
+        If the return value is less than or equal to 0, it means no timeout during the DAG parsing.
+        """
+
+
+

Review comment:
       ```suggestion
   ```
   
   nit: one less empty line to be consistent

##########
File path: docs/apache-airflow/faq.rst
##########
@@ -119,6 +119,30 @@ How do I trigger tasks based on another task's failure?
 
 You can achieve this with :ref:`concepts:trigger-rules`.
 
+How to control DAG file parsing timeout for different DAG files?
+---------------------------------------------------------------------------------
+(only valid for Airflow >= 2.3.0)
+
+You can add a function ``get_dagbag_import_timeout`` in the ``airflow_local_settings.py``. This function gets

Review comment:
       ```suggestion
   You can add a ``get_dagbag_import_timeout`` function in your ``airflow_local_settings.py`` which gets
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on pull request #21501:
URL: https://github.com/apache/airflow/pull/21501#issuecomment-1035881416


   This might be a good place: https://airflow.apache.org/docs/apache-airflow/stable/faq.html?highlight=parsing#scheduling-dag-file-parsing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on pull request #21501:
URL: https://github.com/apache/airflow/pull/21501#issuecomment-1035871896


   Oh, and we should add a small section in the docs somewhere too I'd think.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21501:
URL: https://github.com/apache/airflow/pull/21501#discussion_r804640132



##########
File path: airflow/models/dagbag.py
##########
@@ -332,7 +326,26 @@ def _load_modules_from_file(self, filepath, safe_mode):
                     )
                 else:
                     self.import_errors[filepath] = str(e)
-        return []
+                return []
+
+        dagbag_import_timeout = settings.get_dagbag_import_timeout(filepath)
+
+        if not (isinstance(dagbag_import_timeout, int) or isinstance(dagbag_import_timeout, float)):

Review comment:
       ```suggestion
           if not isinstance(dagbag_import_timeout, (int, float)):
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21501:
URL: https://github.com/apache/airflow/pull/21501#discussion_r804637903



##########
File path: airflow/models/dagbag.py
##########
@@ -310,13 +310,7 @@ def _load_modules_from_file(self, filepath, safe_mode):
         if mod_name in sys.modules:
             del sys.modules[mod_name]
 
-        timeout_msg = (
-            f"DagBag import timeout for {filepath} after {self.DAGBAG_IMPORT_TIMEOUT}s.\n"
-            "Please take a look at these docs to improve your DAG import time:\n"
-            f"* {get_docs_url('best-practices.html#top-level-python-code')}\n"
-            f"* {get_docs_url('best-practices.html#reducing-dag-complexity')}"
-        )
-        with timeout(self.DAGBAG_IMPORT_TIMEOUT, error_message=timeout_msg):

Review comment:
       This class constant isn't used anymore either is it? We should remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] pingzh commented on a change in pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
pingzh commented on a change in pull request #21501:
URL: https://github.com/apache/airflow/pull/21501#discussion_r807353056



##########
File path: docs/apache-airflow/faq.rst
##########
@@ -119,6 +119,34 @@ How do I trigger tasks based on another task's failure?
 
 You can achieve this with :ref:`concepts:trigger-rules`.
 
+How to control DAG file parsing timeout for different DAG files?
+----------------------------------------------------------------
+(only valid for Airflow >= 2.3.0)

Review comment:
       @ashb looks like we used the format of `(only valid for Airflow` in this doc.
   
   let me know your thoughts. thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #21501: Support different timeout value for dag file parsing

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #21501:
URL: https://github.com/apache/airflow/pull/21501#discussion_r804345052



##########
File path: airflow/settings.py
##########
@@ -209,6 +209,18 @@ def get_airflow_context_vars(context):
     return {}
 
 
+def get_dagbag_import_timeout(dag_file_path: str) -> float:
+    """
+    This setting allows to dynamically control the dag file parsing timeout.

Review comment:
       ```suggestion
       This setting allows for dynamic control of the dag file parsing timeout based on the DAG file path.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org