You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/01/28 12:40:19 UTC

incubator-airflow git commit: [AIRFLOW-813] Fix unterminated unit tests in SchedulerJobTest

Repository: incubator-airflow
Updated Branches:
  refs/heads/master cf6d50c0e -> 9d9e56dc3


[AIRFLOW-813] Fix unterminated unit tests in SchedulerJobTest

Closes #2030 from fenglu-g/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9d9e56dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9d9e56dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9d9e56dc

Branch: refs/heads/master
Commit: 9d9e56dc3cbec256422249d382aedd29d25c46a3
Parents: cf6d50c
Author: Feng Lu <fe...@fengcloud.hot.corp.google.com>
Authored: Sat Jan 28 13:39:59 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sat Jan 28 13:40:03 2017 +0100

----------------------------------------------------------------------
 airflow/utils/dag_processing.py | 9 +++++++--
 tests/jobs.py                   | 6 ++----
 2 files changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9d9e56dc/airflow/utils/dag_processing.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index ef1c1ed..6ed5db7 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -354,6 +354,8 @@ class DagFileProcessorManager(LoggingMixin):
         self._last_finish_time = {}
         # Map from file path to the number of runs
         self._run_count = defaultdict(int)
+        # Scheduler heartbeat key.
+        self._heart_beat_key = 'heart-beat'
 
     @property
     def file_paths(self):
@@ -628,17 +630,20 @@ class DagFileProcessorManager(LoggingMixin):
 
         self.symlink_latest_log_directory()
 
+        # Update scheduler heartbeat count.
+        self._run_count[self._heart_beat_key] += 1
+
         return simple_dags
 
     def max_runs_reached(self):
         """
         :return: whether all file paths have been processed max_runs times
         """
-        if not self._file_paths:  # No dag file is present.
-            return False
         for file_path in self._file_paths:
             if self._run_count[file_path] != self._max_runs:
                 return False
+        if self._run_count[self._heart_beat_key] < self._max_runs:
+            return False
         return True
 
     def terminate(self):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9d9e56dc/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 1872266..b674bcd 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -267,8 +267,7 @@ class SchedulerJobTest(unittest.TestCase):
         Utility function that runs a single scheduler loop without actually
         changing/scheduling any dags. This is useful to simulate the other side effects of
         running a scheduler loop, e.g. to see what parse errors there are in the
-        dags_folder. The run_duration is limited to 20 seconds as the scheduler
-        will run forever as num_runs is ignored when there is no dag file.
+        dags_folder.
 
         :param dags_folder: the directory to traverse
         :type directory: str
@@ -276,8 +275,7 @@ class SchedulerJobTest(unittest.TestCase):
         scheduler = SchedulerJob(
             dag_id='this_dag_doesnt_exist',  # We don't want to actually run anything
             num_runs=1,
-            subdir=os.path.join(dags_folder),
-            run_duration=20)
+            subdir=os.path.join(dags_folder))
         scheduler.heartrate = 0
         scheduler.run()