You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2016/10/21 19:56:20 UTC
incubator-airflow git commit: [AIRFLOW-385] Add symlink to latest
scheduler log directory
Repository: incubator-airflow
Updated Branches:
refs/heads/master e4cca0de2 -> 8911903f9
[AIRFLOW-385] Add symlink to latest scheduler log directory
Create a symbolic link to the directory contaning
the latest scheduler logs, and update the link
when the target changes.
Update the test_scheduler_job test case to verify
that the symbolic link is created.
Implementation:
- Create a symbolic link to the directory
containing the latest scheduler logs, and update
the link when the target changes.
Testing Done:
- Extend test_scheduler_job test case to verify
that the correct symbolic link is created.
Closes #1842 from vijaysbhat/latest-log-symlink
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8911903f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8911903f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8911903f
Branch: refs/heads/master
Commit: 8911903f9135efd725bb2285500b20e54a34e3b8
Parents: e4cca0d
Author: Vijay Bhat <vi...@gmail.com>
Authored: Fri Oct 21 12:55:55 2016 -0700
Committer: Dan Davydov <da...@airbnb.com>
Committed: Fri Oct 21 12:55:58 2016 -0700
----------------------------------------------------------------------
airflow/utils/dag_processing.py | 42 +++++++++++++++++++++++++++++++++---
tests/core.py | 11 ++++++++++
2 files changed, 50 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8911903f/airflow/utils/dag_processing.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index fc4ca1b..aa502fb 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -465,6 +465,18 @@ class DagFileProcessorManager(LoggingMixin):
results.reverse()
return results
+ def _get_log_directory(self):
+ """
+ Log output from processing DAGs for the current day should go into
+ this directory.
+
+ :return: the path to the corresponding log directory
+ :rtype: unicode
+ """
+ now = datetime.now()
+ return os.path.join(self._child_process_log_directory,
+ now.strftime("%Y-%m-%d"))
+
def _get_log_file_path(self, dag_file_path):
"""
Log output from processing the specified file should go to this
@@ -475,11 +487,9 @@ class DagFileProcessorManager(LoggingMixin):
:return: the path to the corresponding log file
:rtype: unicode
"""
+ log_directory = self._get_log_directory()
# General approach is to put the log file under the same relative path
# under the log directory as the DAG file in the DAG directory
- now = datetime.now()
- log_directory = os.path.join(self._child_process_log_directory,
- now.strftime("%Y-%m-%d"))
relative_dag_file_path = os.path.relpath(dag_file_path, start=self._dag_directory)
path_elements = self._split_path(relative_dag_file_path)
@@ -488,6 +498,30 @@ class DagFileProcessorManager(LoggingMixin):
return os.path.join(log_directory, *path_elements)
+ def symlink_latest_log_directory(self):
+ """
+ Create symbolic link to the current day's log directory to
+ allow easy access to the latest scheduler log files.
+
+ :return: None
+ """
+ log_directory = self._get_log_directory()
+ latest_log_directory_path = os.path.join(
+ self._child_process_log_directory, "latest")
+ if (os.path.isdir(log_directory)):
+ # if symlink exists but is stale, update it
+ if (os.path.islink(latest_log_directory_path)):
+ if(os.readlink(latest_log_directory_path) != log_directory):
+ os.unlink(latest_log_directory_path)
+ os.symlink(log_directory, latest_log_directory_path)
+ elif (os.path.isdir(latest_log_directory_path) or
+ os.path.isfile(latest_log_directory_path)):
+ self.logger.warn("{} already exists as a dir/file. "
+ "Skip creating symlink."
+ .format(latest_log_directory_path))
+ else:
+ os.symlink(log_directory, latest_log_directory_path)
+
def processing_count(self):
"""
:return: the number of files currently being processed
@@ -592,6 +626,8 @@ class DagFileProcessorManager(LoggingMixin):
self._processors[file_path] = processor
+ self.symlink_latest_log_directory()
+
return simple_dags
def max_runs_reached(self):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8911903f/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 626c093..d4a8fa9 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -597,10 +597,21 @@ class CoreTest(unittest.TestCase):
job = jobs.LocalTaskJob(task_instance=ti, ignore_ti_state=True)
job.run()
+ @mock.patch('airflow.utils.dag_processing.datetime', FakeDatetime)
def test_scheduler_job(self):
+ FakeDatetime.now = classmethod(lambda cls: datetime(2016, 1, 1))
job = jobs.SchedulerJob(dag_id='example_bash_operator',
**self.default_scheduler_args)
job.run()
+ log_base_directory = configuration.conf.get("scheduler",
+ "child_process_log_directory")
+ latest_log_directory_path = os.path.join(log_base_directory, "latest")
+ # verify that the symlink to the latest logs exists
+ assert os.path.islink(latest_log_directory_path)
+
+ # verify that the symlink points to the correct log directory
+ log_directory = os.path.join(log_base_directory, "2016-01-01")
+ self.assertEqual(os.readlink(latest_log_directory_path), log_directory)
def test_raw_job(self):
TI = models.TaskInstance