You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/07/11 19:04:11 UTC
incubator-airflow git commit: [AIRFLOW-1357] Fix scheduler zip file
support
Repository: incubator-airflow
Updated Branches:
refs/heads/master f6f73b565 -> 3b863f182
[AIRFLOW-1357] Fix scheduler zip file support
Zipped DAGs are supported on the models but not
taken into account
by the scheduler since 1.8. This fixes the issue.
Closes #2406 from ultrabug/jira_1357
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3b863f18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3b863f18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3b863f18
Branch: refs/heads/master
Commit: 3b863f182d42dc1ba76c2623b9983698a682d5f7
Parents: f6f73b5
Author: Ultrabug <ul...@gentoo.org>
Authored: Tue Jul 11 21:03:43 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Jul 11 21:03:53 2017 +0200
----------------------------------------------------------------------
airflow/utils/dag_processing.py | 5 +++--
tests/jobs.py | 21 +++++++++++++++++++--
2 files changed, 22 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3b863f18/airflow/utils/dag_processing.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index cdcc7b0..2e975c1 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -21,6 +21,7 @@ import logging
import os
import re
import time
+import zipfile
from abc import ABCMeta, abstractmethod
from collections import defaultdict
@@ -187,7 +188,7 @@ def list_py_file_paths(directory, safe_mode=True):
continue
mod_name, file_ext = os.path.splitext(
os.path.split(file_path)[-1])
- if file_ext != '.py':
+ if file_ext != '.py' and not zipfile.is_zipfile(file_path):
continue
if any([re.findall(p, file_path) for p in patterns]):
continue
@@ -195,7 +196,7 @@ def list_py_file_paths(directory, safe_mode=True):
# Heuristic that guesses whether a Python file contains an
# Airflow DAG definition.
might_contain_dag = True
- if safe_mode:
+ if safe_mode and not zipfile.is_zipfile(file_path):
with open(file_path, 'rb') as f:
content = f.read()
might_contain_dag = all(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3b863f18/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 824cd9d..6e6150b 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -36,7 +36,7 @@ from airflow.operators.bash_operator import BashOperator
from airflow.utils.db import provide_session
from airflow.utils.state import State
from airflow.utils.timeout import timeout
-from airflow.utils.dag_processing import SimpleDagBag
+from airflow.utils.dag_processing import SimpleDagBag, list_py_file_paths
from mock import patch
from sqlalchemy.orm.session import make_transient
@@ -68,7 +68,8 @@ UNPARSEABLE_DAG_FILE_CONTENTS = 'airflow DAG'
# Filename to be used for dags that are created in an ad-hoc manner and can be removed/
# created at runtime
TEMP_DAG_FILENAME = "temp_dag.py"
-
+TEST_DAGS_FOLDER = os.path.join(
+ os.path.dirname(os.path.realpath(__file__)), 'dags')
class BackfillJobTest(unittest.TestCase):
@@ -2011,3 +2012,19 @@ class SchedulerJobTest(unittest.TestCase):
import_errors = session.query(models.ImportError).all()
self.assertEqual(len(import_errors), 0)
+
+ def test_list_py_file_paths(self):
+ """
+ [JIRA-1357] Test the 'list_py_file_paths' function used by the
+ scheduler to list and load DAGs.
+ """
+ detected_files = []
+ expected_files = []
+ for file_name in os.listdir(TEST_DAGS_FOLDER):
+ if file_name.endswith('.py') or file_name.endswith('.zip'):
+ if file_name not in ['no_dags.py']:
+ expected_files.append(
+ '{}/{}'.format(TEST_DAGS_FOLDER, file_name))
+ for file_path in list_py_file_paths(TEST_DAGS_FOLDER):
+ detected_files.append(file_path)
+ self.assertEqual(sorted(detected_files), sorted(expected_files))