You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/07/11 19:04:11 UTC

incubator-airflow git commit: [AIRFLOW-1357] Fix scheduler zip file support

Repository: incubator-airflow
Updated Branches:
  refs/heads/master f6f73b565 -> 3b863f182


[AIRFLOW-1357] Fix scheduler zip file support

Zipped DAGs are supported on the models but not
taken into account
by the scheduler since 1.8. This fixes the issue.

Closes #2406 from ultrabug/jira_1357


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3b863f18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3b863f18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3b863f18

Branch: refs/heads/master
Commit: 3b863f182d42dc1ba76c2623b9983698a682d5f7
Parents: f6f73b5
Author: Ultrabug <ul...@gentoo.org>
Authored: Tue Jul 11 21:03:43 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Jul 11 21:03:53 2017 +0200

----------------------------------------------------------------------
 airflow/utils/dag_processing.py |  5 +++--
 tests/jobs.py                   | 21 +++++++++++++++++++--
 2 files changed, 22 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3b863f18/airflow/utils/dag_processing.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index cdcc7b0..2e975c1 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -21,6 +21,7 @@ import logging
 import os
 import re
 import time
+import zipfile
 
 from abc import ABCMeta, abstractmethod
 from collections import defaultdict
@@ -187,7 +188,7 @@ def list_py_file_paths(directory, safe_mode=True):
                         continue
                     mod_name, file_ext = os.path.splitext(
                         os.path.split(file_path)[-1])
-                    if file_ext != '.py':
+                    if file_ext != '.py' and not zipfile.is_zipfile(file_path):
                         continue
                     if any([re.findall(p, file_path) for p in patterns]):
                         continue
@@ -195,7 +196,7 @@ def list_py_file_paths(directory, safe_mode=True):
                     # Heuristic that guesses whether a Python file contains an
                     # Airflow DAG definition.
                     might_contain_dag = True
-                    if safe_mode:
+                    if safe_mode and not zipfile.is_zipfile(file_path):
                         with open(file_path, 'rb') as f:
                             content = f.read()
                             might_contain_dag = all(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3b863f18/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 824cd9d..6e6150b 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -36,7 +36,7 @@ from airflow.operators.bash_operator import BashOperator
 from airflow.utils.db import provide_session
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout
-from airflow.utils.dag_processing import SimpleDagBag
+from airflow.utils.dag_processing import SimpleDagBag, list_py_file_paths
 
 from mock import patch
 from sqlalchemy.orm.session import make_transient
@@ -68,7 +68,8 @@ UNPARSEABLE_DAG_FILE_CONTENTS = 'airflow DAG'
 # Filename to be used for dags that are created in an ad-hoc manner and can be removed/
 # created at runtime
 TEMP_DAG_FILENAME = "temp_dag.py"
-
+TEST_DAGS_FOLDER = os.path.join(
+    os.path.dirname(os.path.realpath(__file__)), 'dags')
 
 class BackfillJobTest(unittest.TestCase):
 
@@ -2011,3 +2012,19 @@ class SchedulerJobTest(unittest.TestCase):
         import_errors = session.query(models.ImportError).all()
 
         self.assertEqual(len(import_errors), 0)
+
+    def test_list_py_file_paths(self):
+        """
+        [JIRA-1357] Test the 'list_py_file_paths' function used by the
+        scheduler to list and load DAGs.
+        """
+        detected_files = []
+        expected_files = []
+        for file_name in os.listdir(TEST_DAGS_FOLDER):
+            if file_name.endswith('.py') or file_name.endswith('.zip'):
+                if file_name not in ['no_dags.py']:
+                    expected_files.append(
+                        '{}/{}'.format(TEST_DAGS_FOLDER, file_name))
+        for file_path in list_py_file_paths(TEST_DAGS_FOLDER):
+            detected_files.append(file_path)
+        self.assertEqual(sorted(detected_files), sorted(expected_files))