You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/18 04:20:18 UTC

[GitHub] stale[bot] closed pull request #2015: [AIRFLOW-765] Auto detect dag dependency files, variables, and resour…

stale[bot] closed pull request #2015: [AIRFLOW-765] Auto detect dag dependency files, variables, and resour…
URL: https://github.com/apache/incubator-airflow/pull/2015
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.rat-excludes b/.rat-excludes
index 1238abb0a0..bdcb289891 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -24,4 +24,4 @@ CHANGELOG.txt
 # it is compatible according to http://www.apache.org/legal/resolved.html#category-a
 kerberos_auth.py
 airflow_api_auth_backend_kerberos_auth_py.html
-
+autoreload.json
diff --git a/airflow/example_dags/config/autoreload.json b/airflow/example_dags/config/autoreload.json
new file mode 100644
index 0000000000..587719bd9c
--- /dev/null
+++ b/airflow/example_dags/config/autoreload.json
@@ -0,0 +1,4 @@
+{
+  "dag_owner": "amin",
+  "schedule_interval": "*/1 * * * *"
+}
\ No newline at end of file
diff --git a/airflow/example_dags/example_autoreload.py b/airflow/example_dags/example_autoreload.py
new file mode 100644
index 0000000000..74f38de978
--- /dev/null
+++ b/airflow/example_dags/example_autoreload.py
@@ -0,0 +1,61 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import print_function
+
+import json
+import os
+from datetime import datetime, timedelta
+
+from airflow.models import DAG, Variable
+from airflow.operators.dummy_operator import DummyOperator
+
+config_path = os.path.join(
+    os.path.dirname(os.path.abspath(__file__)),
+    'config'
+)
+
+# these files will be watched for changes
+__resource_file_selectors__ = [
+    (config_path, ('.json'))
+]
+
+with open(os.path.join(config_path, 'autoreload.json')) as config_data:
+    config_object = json.load(config_data)
+
+# instances of MaterializedVariable will be watched for changes and
+# trigger autoreload of the DAG
+number_of_days_back_var = Variable.materialize(
+    'number_of_days_back',
+    default_value=7
+)
+
+number_of_days_back = datetime.combine(
+    datetime.today() -
+    timedelta(int(number_of_days_back_var.val)), datetime.min.time()
+)
+
+args = {
+    'owner': config_object['dag_owner'],
+    'start_date': number_of_days_back,
+}
+
+dag = DAG(
+    dag_id='example_autoreload',
+    default_args=args,
+    schedule_interval=config_object['schedule_interval']
+)
+
+run_this = DummyOperator(
+    task_id='dummy_operator',
+    dag=dag)
diff --git a/airflow/migrations/versions/5986598b22a9_add_last_updated_to_variable.py b/airflow/migrations/versions/5986598b22a9_add_last_updated_to_variable.py
new file mode 100644
index 0000000000..4bdb59bab0
--- /dev/null
+++ b/airflow/migrations/versions/5986598b22a9_add_last_updated_to_variable.py
@@ -0,0 +1,40 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Add Last Updated to Variable
+
+Revision ID: 5986598b22a9
+Revises: 5e7d17757c7a
+Create Date: 2017-01-17 23:22:10.142592
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '5986598b22a9'
+down_revision = '5e7d17757c7a'
+branch_labels = None
+depends_on = None
+
+import sqlalchemy as sa
+from alembic import op
+
+
+def upgrade():
+    op.add_column('variable', sa.Column('last_updated',
+                                        sa.DateTime(),
+                                        default=sa.func.now(),
+                                        onupdate=sa.func.now()))
+
+
+def downgrade():
+    op.drop_column('variable', 'last_updated')
diff --git a/airflow/models.py b/airflow/models.py
index aab4833ae8..9fc134f11a 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -73,6 +73,7 @@
 from airflow.utils.db import provide_session
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.email import send_email
+from airflow.utils.file import each
 from airflow.utils.helpers import (
     as_tuple, is_container, is_in, validate_key, pprinttable)
 from airflow.utils.logging import LoggingMixin
@@ -141,6 +142,187 @@ def clear_task_instances(tis, session, activate_dag_runs=True):
             dr.start_date = datetime.now()
 
 
+class DagLoader(LoggingMixin):
+    """
+    Helps with loading dag files as well as detecting their dependant
+    python files, Variable instances and resources files. Resource files
+    are defined by having a top level variable called __resource_file_selectors__
+    in the dag file.
+
+    Each dag file can list a set of resource selectors to help
+    find files that it depends on other than python modules and variables.
+    eg: sample_dag.py would contain something like the following. If any of the
+    `.yaml` files change, it will trigger a reload of the dag on the next scheduler
+    heartbeat.
+
+    config_path = os.path.join(
+       os.path.dirname(os.path.abspath(__file__)),
+       'config'
+    )
+     __resource_file_selectors__ = [
+       (config_path, ('.yaml'))
+    ]
+
+    :param dag_folder: the folder to scan to find DAGs
+    :type dag_folder: unicode
+    """
+
+    def __init__(self, dag_folder,):
+        self.dag_folder = dag_folder
+        # the file's last modified timestamp when we last read it
+        self.file_last_changed = {}
+        self.variable_last_changed = {}
+        self.resource_last_changed = {}
+        self.dep_files = {}
+        self.dep_variables = {}
+        self.dep_resources = {}
+
+    def load_source(self, mod_name, filepath):
+        module = imp.load_source(mod_name, filepath)
+
+        self.get_python_deps(module, filepath)
+        self.get_variable_deps(module, filepath)
+        self.get_resource_deps(module, filepath)
+        return module
+
+    def deps_changed(self, filepath):
+        return (self.dep_files_changed(filepath) or
+                self.dep_variables_changed(filepath) or
+                self.dep_resources_changed(filepath))
+
+    def dependency_source_files(self, module):
+        abs_dags_folder = os.path.abspath(self.dag_folder)
+        dags_root_folder = os.path.split(abs_dags_folder)[1]
+        deps = {}
+
+        self.get_dep_files_recurse(module, abs_dags_folder, dags_root_folder, deps)
+        return deps
+
+    def get_python_deps(self, module, filepath):
+        if filepath in self.dep_files:
+            for d in self.dep_files[filepath]:
+                self.file_last_changed.pop(d, None)
+            del self.dep_files[filepath]
+        dep_files = self.dependency_source_files(module).keys()
+        for f in dep_files:
+            self.file_last_changed[f] = datetime.fromtimestamp(
+                os.path.getmtime(f))
+        self.file_last_changed[filepath] = datetime.fromtimestamp(
+            os.path.getmtime(filepath))
+        self.dep_files[filepath] = dep_files
+
+    def get_variable_deps(self, module, filepath):
+        if filepath in self.dep_variables:
+            for v in self.dep_variables[filepath]:
+                self.variable_last_changed.pop(v, None)
+            del self.dep_variables[filepath]
+        dep_variables = []
+        for key in module.__dict__:
+            self.recurse_for_variables(module.__dict__[key], dep_variables)
+        self.dep_variables[filepath] = dep_variables
+
+    def recurse_for_variables(self, attr, dep_variables):
+        if isinstance(attr, MaterializedVariable):
+            dep_variables.append(attr.key)
+            self.variable_last_changed[attr.key] = attr.last_updated
+        elif isinstance(attr, list):
+            for _, o in enumerate(attr):
+                self.recurse_for_variables(o, dep_variables)
+        elif isinstance(attr, dict):
+            for key in attr:
+                self.recurse_for_variables(attr[key], dep_variables)
+
+    def get_resource_deps(self, module, filepath):
+        if filepath in self.dep_resources:
+            for r in self.dep_resources[filepath]:
+                self.resource_last_changed.pop(r, None)
+            del self.dep_resources[filepath]
+        dep_resources = []
+
+        def on_resource_file(abs_path, *_):
+            dep_resources.append(abs_path)
+            self.resource_last_changed[abs_path] = datetime.fromtimestamp(
+                os.path.getmtime(abs_path))
+
+        if '__resource_file_selectors__' in module.__dict__:
+            resource_files = module.__dict__['__resource_file_selectors__']
+            for (rf_path, rf_exts) in resource_files:
+                each(func=on_resource_file,
+                     path=rf_path,
+                     file_extensions=rf_exts)
+        self.dep_resources[filepath] = dep_resources
+
+    def dep_files_changed(self, filepath):
+        dttm = datetime.fromtimestamp(os.path.getmtime(filepath))
+        if (filepath not in self.dep_files or
+                filepath not in self.file_last_changed):
+            Stats.incr('source_file_added')
+            return True
+        if dttm != self.file_last_changed[filepath]:
+            Stats.incr('source_file_changed')
+            return True
+        deps = self.dep_files[filepath]
+        for d in deps:
+            dttm = datetime.fromtimestamp(os.path.getmtime(d))
+            if d not in self.file_last_changed:
+                Stats.incr('dep_source_file_added')
+                return True
+            elif dttm != self.file_last_changed[d]:
+                Stats.incr('dep_source_file_changed')
+                return True
+        return False
+
+    @provide_session
+    def dep_variables_changed(self, filepath, session=None):
+        dep_variables = self.dep_variables[filepath]
+        for v in dep_variables:
+            var = session.query(Variable).filter(Variable.key == v).first()
+            if var is None:
+                return True
+            dttm = var.last_updated
+            if v not in self.variable_last_changed:
+                Stats.incr('dep_var_added')
+                return True
+            elif dttm != self.variable_last_changed[v]:
+                Stats.incr('dep_var_changed')
+                return True
+        return False
+
+    def dep_resources_changed(self, filepath):
+        dep_resources = self.dep_resources[filepath]
+        for d in dep_resources:
+            dttm = datetime.fromtimestamp(os.path.getmtime(d))
+            if d not in self.resource_last_changed:
+                Stats.incr('dep_resource_added')
+                return True
+            elif dttm != self.resource_last_changed[d]:
+                Stats.incr('dep_resource_changed')
+                return True
+        return False
+
+    @classmethod
+    def get_dep_files_recurse(cls, module, abs_dags_folder, dags_root_folder, deps):
+        for i in inspect.getmembers(module):
+            sf = cls.get_source_file(i[1])
+            if sf is not None and (sf.find(abs_dags_folder) == 0 or
+                                   sf.find(dags_root_folder) == 0 or
+                                   sf.find('airflow/example_dags') > -1):
+                if sf in deps.keys():
+                    continue
+                deps[sf] = i[1]
+                cls.get_dep_files_recurse(i[1],
+                                          abs_dags_folder=abs_dags_folder,
+                                          dags_root_folder=dags_root_folder,
+                                          deps=deps)
+
+    @classmethod
+    def get_source_file(cls, module):
+        try:
+            return inspect.getsourcefile(module)
+        except Exception:
+            return None
+
+
 class DagBag(BaseDagBag, LoggingMixin):
     """
     A dagbag is a collection of dags, parsed out of a folder tree and has high
@@ -172,9 +354,8 @@ def __init__(
         dag_folder = dag_folder or settings.DAGS_FOLDER
         self.logger.info("Filling up the DagBag from {}".format(dag_folder))
         self.dag_folder = dag_folder
+        self.dag_loader = DagLoader(dag_folder)
         self.dags = {}
-        # the file's last modified timestamp when we last read it
-        self.file_last_changed = {}
         self.executor = executor
         self.import_errors = {}
 
@@ -185,6 +366,10 @@ def __init__(
             self.collect_dags(example_dag_folder)
         self.collect_dags(dag_folder)
 
+    @property
+    def file_last_changed(self):
+        return self.dag_loader.file_last_changed
+
     def size(self):
         """
         :return: the amount of dags contained in this dagbag
@@ -237,9 +422,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
             # This failed before in what may have been a git sync
             # race condition
             file_last_changed_on_disk = datetime.fromtimestamp(os.path.getmtime(filepath))
-            if only_if_updated \
-                    and filepath in self.file_last_changed \
-                    and file_last_changed_on_disk == self.file_last_changed[filepath]:
+            if only_if_updated and not self.dag_loader.deps_changed(filepath):
                 return found_dags
 
         except Exception as e:
@@ -266,7 +449,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
 
             with timeout(configuration.getint('core', "DAGBAG_IMPORT_TIMEOUT")):
                 try:
-                    m = imp.load_source(mod_name, filepath)
+                    m = self.dag_loader.load_source(mod_name, filepath)
                     mods.append(m)
                 except Exception as e:
                     self.logger.exception("Failed to import: " + filepath)
@@ -3618,6 +3801,17 @@ def __repr__(self):
         return self.label
 
 
+class MaterializedVariable(object):
+    """
+    When used in a dag file changes to it will trigger a
+    reload of the dag.
+    """
+    def __init__(self, key, val, last_updated):
+        self.key = key
+        self.val = val
+        self.last_updated = last_updated
+
+
 class Variable(Base):
     __tablename__ = "variable"
 
@@ -3625,6 +3819,7 @@ class Variable(Base):
     key = Column(String(ID_LEN), unique=True)
     _val = Column('val', Text)
     is_encrypted = Column(Boolean, unique=False, default=False)
+    last_updated = Column(DateTime, default=func.now(), onupdate=func.now())
 
     def __repr__(self):
         # Hiding the value
@@ -3714,6 +3909,34 @@ def set(cls, key, value, serialize_json=False, session=None):
         session.add(Variable(key=key, val=stored_value))
         session.flush()
 
+    @classmethod
+    @provide_session
+    def materialize(cls, key, default_value,
+                    deserialize_json=False, session=None):
+        """
+        This function returns a MaterializedVariable instance that can
+        be used to automatically reload a dag when the value of this
+        variable changes. If the Variable does not exist, it will insert it.
+
+        :param key: Dict key for this Variable
+        :type key: String
+        :param default_value: the default_value
+        :param deserialize_json: whether to deserialize the value
+        :type deserialize_json: boolean
+        :param session: the session
+        :type session: Session
+        :return: an instance of MaterializedVariable or None if not found
+        :rtype: MaterializedVariable
+        """
+        obj = session.query(cls).filter(cls.key == key).first()
+        if obj is None:
+            cls.set(key, default_value, serialize_json=deserialize_json)
+            obj = session.query(cls).filter(cls.key == key).first()
+        return MaterializedVariable(
+            obj.key,
+            json.loads(obj.val) if deserialize_json else obj.val,
+            obj.last_updated)
+
 
 class XCom(Base):
     """
diff --git a/airflow/utils/file.py b/airflow/utils/file.py
index 352755e405..9f3cf6c6f7 100644
--- a/airflow/utils/file.py
+++ b/airflow/utils/file.py
@@ -18,9 +18,10 @@
 import errno
 import os
 import shutil
+from contextlib import contextmanager
 from tempfile import mkdtemp
 
-from contextlib import contextmanager
+from past.builtins import basestring
 
 
 @contextmanager
@@ -55,3 +56,37 @@ def mkdirs(path, mode):
             raise
     finally:
         os.umask(o_umask)
+
+
+def each(func, path, file_extensions, filters=None):
+    """
+    for each file in the path run func and pass absolute, and relative paths as
+    well as filename, as its three params
+
+    :return: array of return values
+    :param func: the function to run
+    :type func: function
+    :param path: the directory of files to loop through
+    :type path: str
+    :param file_extensions: a set of file extensions. eg: (".yaml")
+    :type file_extensions: collections.iterable
+    :param filters: a list of filename to run the func for. (Don't run for other files)
+    :type filters: array or set
+    """
+    if isinstance(file_extensions, basestring):
+        file_extensions = [file_extensions]
+
+    rets = {}
+    for root, _, files in os.walk(path):
+        for filename in sorted(files):
+            if filters and filename not in filters:
+                continue
+            if not any([filename.endswith(ext) for ext in file_extensions]):
+                continue
+            abs_path = os.path.join(root, filename)
+            rel_path = abs_path[len(path) + 1:]
+            rets[abs_path] = func(abs_path, rel_path, filename)
+            # early terminate if one of the calls returns False
+            if rets[abs_path] is False:
+                return rets
+    return rets


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services