You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/09/07 13:47:31 UTC

[GitHub] [airflow] seelmann commented on a change in pull request #5908: Revert "[AIRFLOW-4797] Improve performance and behaviour of zombie de…

seelmann commented on a change in pull request #5908: Revert "[AIRFLOW-4797] Improve performance and behaviour of zombie de…
URL: https://github.com/apache/airflow/pull/5908#discussion_r321969740
 
 

 ##########
 File path: tests/utils/test_dag_processing.py
 ##########
 @@ -169,6 +179,126 @@ def test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self):
         manager.set_file_paths(['abc.txt'])
         self.assertDictEqual(manager._processors, {'abc.txt': mock_processor})
 
+    def test_find_zombies(self):
+        manager = DagFileProcessorManager(
+            dag_directory='directory',
+            file_paths=['abc.txt'],
+            max_runs=1,
+            processor_factory=MagicMock().return_value,
+            processor_timeout=timedelta.max,
+            signal_conn=MagicMock(),
+            async_mode=True)
+
+        dagbag = DagBag(TEST_DAG_FOLDER)
+        with create_session() as session:
+            session.query(LJ).delete()
+            dag = dagbag.get_dag('example_branch_operator')
+            task = dag.get_task(task_id='run_this_first')
+
+            ti = TI(task, DEFAULT_DATE, State.RUNNING)
+            lj = LJ(ti)
+            lj.state = State.SHUTDOWN
+            lj.id = 1
+            ti.job_id = lj.id
+
+            session.add(lj)
+            session.add(ti)
+            session.commit()
+
+            manager._last_zombie_query_time = timezone.utcnow() - timedelta(
+                seconds=manager._zombie_threshold_secs + 1)
+            manager._find_zombies()
+            zombies = manager._zombies
+            self.assertEqual(1, len(zombies))
+            self.assertIsInstance(zombies[0], SimpleTaskInstance)
+            self.assertEqual(ti.dag_id, zombies[0].dag_id)
+            self.assertEqual(ti.task_id, zombies[0].task_id)
+            self.assertEqual(ti.execution_date, zombies[0].execution_date)
+
+            session.query(TI).delete()
+            session.query(LJ).delete()
+
+    def test_zombies_are_correctly_passed_to_dag_file_processor(self):
+        """
+        Check that the same set of zombies are passed to the dag
+        file processors until the next zombie detection logic is invoked.
+        :return:
+        """
+
+        dagbag = DagBag(os.path.join(TEST_DAG_FOLDER, 'test_example_bash_operator'), include_examples=False)
 
 Review comment:
   Seems test requires to `include_examples=True` in order to load the test DAG below.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services