You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/01/28 12:40:19 UTC
incubator-airflow git commit: [AIRFLOW-813] Fix unterminated unit
tests in SchedulerJobTest
Repository: incubator-airflow
Updated Branches:
refs/heads/master cf6d50c0e -> 9d9e56dc3
[AIRFLOW-813] Fix unterminated unit tests in SchedulerJobTest
Closes #2030 from fenglu-g/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9d9e56dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9d9e56dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9d9e56dc
Branch: refs/heads/master
Commit: 9d9e56dc3cbec256422249d382aedd29d25c46a3
Parents: cf6d50c
Author: Feng Lu <fe...@fengcloud.hot.corp.google.com>
Authored: Sat Jan 28 13:39:59 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sat Jan 28 13:40:03 2017 +0100
----------------------------------------------------------------------
airflow/utils/dag_processing.py | 9 +++++++--
tests/jobs.py | 6 ++----
2 files changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9d9e56dc/airflow/utils/dag_processing.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index ef1c1ed..6ed5db7 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -354,6 +354,8 @@ class DagFileProcessorManager(LoggingMixin):
self._last_finish_time = {}
# Map from file path to the number of runs
self._run_count = defaultdict(int)
+ # Scheduler heartbeat key.
+ self._heart_beat_key = 'heart-beat'
@property
def file_paths(self):
@@ -628,17 +630,20 @@ class DagFileProcessorManager(LoggingMixin):
self.symlink_latest_log_directory()
+ # Update scheduler heartbeat count.
+ self._run_count[self._heart_beat_key] += 1
+
return simple_dags
def max_runs_reached(self):
"""
:return: whether all file paths have been processed max_runs times
"""
- if not self._file_paths: # No dag file is present.
- return False
for file_path in self._file_paths:
if self._run_count[file_path] != self._max_runs:
return False
+ if self._run_count[self._heart_beat_key] < self._max_runs:
+ return False
return True
def terminate(self):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9d9e56dc/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 1872266..b674bcd 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -267,8 +267,7 @@ class SchedulerJobTest(unittest.TestCase):
Utility function that runs a single scheduler loop without actually
changing/scheduling any dags. This is useful to simulate the other side effects of
running a scheduler loop, e.g. to see what parse errors there are in the
- dags_folder. The run_duration is limited to 20 seconds as the scheduler
- will run forever as num_runs is ignored when there is no dag file.
+ dags_folder.
:param dags_folder: the directory to traverse
:type directory: str
@@ -276,8 +275,7 @@ class SchedulerJobTest(unittest.TestCase):
scheduler = SchedulerJob(
dag_id='this_dag_doesnt_exist', # We don't want to actually run anything
num_runs=1,
- subdir=os.path.join(dags_folder),
- run_duration=20)
+ subdir=os.path.join(dags_folder))
scheduler.heartrate = 0
scheduler.run()