You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2016/10/21 19:56:20 UTC

incubator-airflow git commit: [AIRFLOW-385] Add symlink to latest scheduler log directory

Repository: incubator-airflow
Updated Branches:
  refs/heads/master e4cca0de2 -> 8911903f9


[AIRFLOW-385] Add symlink to latest scheduler log directory

Create a symbolic link to the directory contaning
the latest scheduler logs, and update the link
when the target changes.

Update the test_scheduler_job test case to verify
that the symbolic link is created.

Implementation:
- Create a symbolic link to the directory
containing the latest scheduler logs, and update
the link when the target changes.

Testing Done:
- Extend test_scheduler_job test case to verify
that the correct symbolic link is created.

Closes #1842 from vijaysbhat/latest-log-symlink


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8911903f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8911903f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8911903f

Branch: refs/heads/master
Commit: 8911903f9135efd725bb2285500b20e54a34e3b8
Parents: e4cca0d
Author: Vijay Bhat <vi...@gmail.com>
Authored: Fri Oct 21 12:55:55 2016 -0700
Committer: Dan Davydov <da...@airbnb.com>
Committed: Fri Oct 21 12:55:58 2016 -0700

----------------------------------------------------------------------
 airflow/utils/dag_processing.py | 42 +++++++++++++++++++++++++++++++++---
 tests/core.py                   | 11 ++++++++++
 2 files changed, 50 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8911903f/airflow/utils/dag_processing.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index fc4ca1b..aa502fb 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -465,6 +465,18 @@ class DagFileProcessorManager(LoggingMixin):
         results.reverse()
         return results
 
+    def _get_log_directory(self):
+        """
+        Log output from processing DAGs for the current day should go into
+        this directory.
+
+        :return: the path to the corresponding log directory
+        :rtype: unicode
+        """
+        now = datetime.now()
+        return os.path.join(self._child_process_log_directory,
+            now.strftime("%Y-%m-%d"))
+
     def _get_log_file_path(self, dag_file_path):
         """
         Log output from processing the specified file should go to this
@@ -475,11 +487,9 @@ class DagFileProcessorManager(LoggingMixin):
         :return: the path to the corresponding log file
         :rtype: unicode
         """
+        log_directory = self._get_log_directory()
         # General approach is to put the log file under the same relative path
         # under the log directory as the DAG file in the DAG directory
-        now = datetime.now()
-        log_directory = os.path.join(self._child_process_log_directory,
-                                     now.strftime("%Y-%m-%d"))
         relative_dag_file_path = os.path.relpath(dag_file_path, start=self._dag_directory)
         path_elements = self._split_path(relative_dag_file_path)
 
@@ -488,6 +498,30 @@ class DagFileProcessorManager(LoggingMixin):
 
         return os.path.join(log_directory, *path_elements)
 
+    def symlink_latest_log_directory(self):
+        """
+        Create symbolic link to the current day's log directory to
+        allow easy access to the latest scheduler log files.
+
+        :return: None
+        """
+        log_directory = self._get_log_directory()
+        latest_log_directory_path = os.path.join(
+            self._child_process_log_directory, "latest")
+        if (os.path.isdir(log_directory)):
+            # if symlink exists but is stale, update it
+            if (os.path.islink(latest_log_directory_path)):
+                if(os.readlink(latest_log_directory_path) != log_directory):
+                    os.unlink(latest_log_directory_path)
+                    os.symlink(log_directory, latest_log_directory_path)
+            elif (os.path.isdir(latest_log_directory_path) or
+                    os.path.isfile(latest_log_directory_path)):
+                self.logger.warn("{} already exists as a dir/file. "
+                                "Skip creating symlink."
+                                    .format(latest_log_directory_path))
+            else:
+                os.symlink(log_directory, latest_log_directory_path)
+
     def processing_count(self):
         """
         :return: the number of files currently being processed
@@ -592,6 +626,8 @@ class DagFileProcessorManager(LoggingMixin):
 
             self._processors[file_path] = processor
 
+        self.symlink_latest_log_directory()
+
         return simple_dags
 
     def max_runs_reached(self):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8911903f/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 626c093..d4a8fa9 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -597,10 +597,21 @@ class CoreTest(unittest.TestCase):
         job = jobs.LocalTaskJob(task_instance=ti, ignore_ti_state=True)
         job.run()
 
+    @mock.patch('airflow.utils.dag_processing.datetime', FakeDatetime)
     def test_scheduler_job(self):
+        FakeDatetime.now = classmethod(lambda cls: datetime(2016, 1, 1))
         job = jobs.SchedulerJob(dag_id='example_bash_operator',
                                 **self.default_scheduler_args)
         job.run()
+        log_base_directory = configuration.conf.get("scheduler",
+            "child_process_log_directory")
+        latest_log_directory_path = os.path.join(log_base_directory, "latest")
+        # verify that the symlink to the latest logs exists
+        assert os.path.islink(latest_log_directory_path)
+
+        # verify that the symlink points to the correct log directory
+        log_directory = os.path.join(log_base_directory, "2016-01-01")
+        self.assertEqual(os.readlink(latest_log_directory_path), log_directory)
 
     def test_raw_job(self):
         TI = models.TaskInstance