You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/08/21 21:55:59 UTC

[GitHub] [airflow] xinbinhuang commented on a change in pull request #17769: Fix dag_processing.last_duration metric random holes

xinbinhuang commented on a change in pull request #17769:
URL: https://github.com/apache/airflow/pull/17769#discussion_r693405701



##########
File path: tests/dag_processing/test_manager.py
##########
@@ -694,6 +695,48 @@ def fake_processor_(*args, **kwargs):
             child_pipe.close()
             thread.join(timeout=1.0)
 
+    @conf_vars({('core', 'load_examples'): 'False'})
+    @mock.patch('airflow.dag_processing.manager.Stats.timing')
+    def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmpdir):
+        # arrange
+        filename_to_parse = tmpdir / 'temp_dag.py'
+        # Generate dag
+        dag_code = dedent(
+            """
+        from airflow import DAG
+        dag = DAG(dag_id='temp_dag', schedule_interval='0 0 * * *')
+        """
+        )
+        with open(filename_to_parse, 'w') as file_to_parse:
+            file_to_parse.writelines(dag_code)
+
+        child_pipe, parent_pipe = multiprocessing.Pipe()
+
+        async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn')
+        manager = DagFileProcessorManager(
+            dag_directory=tmpdir,
+            max_runs=1,
+            processor_timeout=timedelta.max,
+            signal_conn=child_pipe,
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=async_mode,
+        )
+
+        # act

Review comment:
       ```suggestion
   ```

##########
File path: tests/dag_processing/test_manager.py
##########
@@ -694,6 +695,48 @@ def fake_processor_(*args, **kwargs):
             child_pipe.close()
             thread.join(timeout=1.0)
 
+    @conf_vars({('core', 'load_examples'): 'False'})
+    @mock.patch('airflow.dag_processing.manager.Stats.timing')
+    def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmpdir):
+        # arrange

Review comment:
       ```suggestion
   ```

##########
File path: tests/dag_processing/test_manager.py
##########
@@ -694,6 +695,48 @@ def fake_processor_(*args, **kwargs):
             child_pipe.close()
             thread.join(timeout=1.0)
 
+    @conf_vars({('core', 'load_examples'): 'False'})
+    @mock.patch('airflow.dag_processing.manager.Stats.timing')
+    def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmpdir):
+        # arrange
+        filename_to_parse = tmpdir / 'temp_dag.py'
+        # Generate dag

Review comment:
       ```suggestion
   ```

##########
File path: tests/dag_processing/test_manager.py
##########
@@ -694,6 +695,48 @@ def fake_processor_(*args, **kwargs):
             child_pipe.close()
             thread.join(timeout=1.0)
 
+    @conf_vars({('core', 'load_examples'): 'False'})
+    @mock.patch('airflow.dag_processing.manager.Stats.timing')
+    def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmpdir):
+        # arrange
+        filename_to_parse = tmpdir / 'temp_dag.py'
+        # Generate dag
+        dag_code = dedent(
+            """
+        from airflow import DAG
+        dag = DAG(dag_id='temp_dag', schedule_interval='0 0 * * *')
+        """
+        )
+        with open(filename_to_parse, 'w') as file_to_parse:
+            file_to_parse.writelines(dag_code)
+
+        child_pipe, parent_pipe = multiprocessing.Pipe()
+
+        async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn')
+        manager = DagFileProcessorManager(
+            dag_directory=tmpdir,
+            max_runs=1,
+            processor_timeout=timedelta.max,
+            signal_conn=child_pipe,
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=async_mode,
+        )
+
+        # act
+        with create_session():
+            self.run_processor_manager_one_loop(manager, parent_pipe)
+
+        child_pipe.close()
+        parent_pipe.close()
+
+        # assert
+        # we check that after processing the file and removing it from DagFileProcessorManager._processors,
+        # the statistics on the last processing was sent to the statsd

Review comment:
       ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org