You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by sa...@apache.org on 2016/09/28 00:07:32 UTC

incubator-airflow git commit: [AIRFLOW-198] Implement latest_only_operator

Repository: incubator-airflow
Updated Branches:
  refs/heads/master d4013f919 -> edf033be6


[AIRFLOW-198] Implement latest_only_operator

Dear Airflow Maintainers,

Please accept this PR that addresses the following
issues:
-
https://issues.apache.org/jira/browse/AIRFLOW-198

Testing Done:
- Local testing of dag operation with
LatestOnlyOperator
- Unit test added

Closes #1752 from gwax/latest_only


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/edf033be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/edf033be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/edf033be

Branch: refs/heads/master
Commit: edf033be65b575f44aa221d5d0ec9ecb6b32c67a
Parents: d4013f9
Author: George Leslie-Waksman <ge...@cloverhealth.com>
Authored: Tue Sep 27 17:07:14 2016 -0700
Committer: Siddharth Anand <si...@yahoo.com>
Committed: Tue Sep 27 17:07:14 2016 -0700

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 airflow/example_dags/example_latest_only.py     |  34 +++++++
 .../example_latest_only_with_trigger.py         |  43 +++++++++
 airflow/operators/__init__.py                   |   1 +
 airflow/operators/latest_only_operator.py       |  57 ++++++++++++
 docs/concepts.rst                               |  74 +++++++++++++++
 docs/img/latest_only_with_trigger.png           | Bin 0 -> 40034 bytes
 setup.py                                        |   2 +-
 tests/core.py                                   |   2 +-
 tests/operators/latest_only_operator.py         |  93 +++++++++++++++++++
 10 files changed, 305 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/edf033be/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 48af479..ca6de2b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,6 +5,7 @@
 .DS_Store
 .ipynb*
 .coverage
+.python-version
 airflow/git_version
 airflow/www/static/coverage/
 airflow.db

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/edf033be/airflow/example_dags/example_latest_only.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_latest_only.py b/airflow/example_dags/example_latest_only.py
new file mode 100644
index 0000000..9ce03b9
--- /dev/null
+++ b/airflow/example_dags/example_latest_only.py
@@ -0,0 +1,34 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Example of the LatestOnlyOperator
+"""
+import datetime as dt
+
+from airflow.models import DAG
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.latest_only_operator import LatestOnlyOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+
+dag = DAG(
+    dag_id='latest_only',
+    schedule_interval=dt.timedelta(hours=4),
+    start_date=dt.datetime(2016, 9, 20),
+)
+
+latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
+
+task1 = DummyOperator(task_id='task1', dag=dag)
+task1.set_upstream(latest_only)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/edf033be/airflow/example_dags/example_latest_only_with_trigger.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_latest_only_with_trigger.py b/airflow/example_dags/example_latest_only_with_trigger.py
new file mode 100644
index 0000000..e3a88b7
--- /dev/null
+++ b/airflow/example_dags/example_latest_only_with_trigger.py
@@ -0,0 +1,43 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Example LatestOnlyOperator and TriggerRule interactions
+"""
+import datetime as dt
+
+from airflow.models import DAG
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.latest_only_operator import LatestOnlyOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+
+dag = DAG(
+    dag_id='latest_only_with_trigger',
+    schedule_interval=dt.timedelta(hours=4),
+    start_date=dt.datetime(2016, 9, 20),
+)
+
+latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
+
+task1 = DummyOperator(task_id='task1', dag=dag)
+task1.set_upstream(latest_only)
+
+task2 = DummyOperator(task_id='task2', dag=dag)
+
+task3 = DummyOperator(task_id='task3', dag=dag)
+task3.set_upstream([task1, task2])
+
+task4 = DummyOperator(task_id='task4', dag=dag,
+                      trigger_rule=TriggerRule.ALL_DONE)
+task4.set_upstream([task1, task2])

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/edf033be/airflow/operators/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/operators/__init__.py b/airflow/operators/__init__.py
index f39ad01..4cfac7b 100644
--- a/airflow/operators/__init__.py
+++ b/airflow/operators/__init__.py
@@ -57,6 +57,7 @@ _operators = {
     'dummy_operator': ['DummyOperator'],
     'email_operator': ['EmailOperator'],
     'hive_to_samba_operator': ['Hive2SambaOperator'],
+    'latest_only_operator': ['LatestOnlyOperator'],
     'mysql_operator': ['MySqlOperator'],
     'sqlite_operator': ['SqliteOperator'],
     'mysql_to_hive': ['MySqlToHiveTransfer'],

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/edf033be/airflow/operators/latest_only_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/latest_only_operator.py b/airflow/operators/latest_only_operator.py
new file mode 100644
index 0000000..49ba2a3
--- /dev/null
+++ b/airflow/operators/latest_only_operator.py
@@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import logging
+
+from airflow.models import BaseOperator, TaskInstance
+from airflow.utils.state import State
+from airflow import settings
+
+
+class LatestOnlyOperator(BaseOperator):
+    """
+    Allows a workflow to skip tasks that are not running during the most
+    recent schedule interval.
+
+    If the task is run outside of the latest schedule interval, all
+    directly downstream tasks will be skipped.
+    """
+
+    ui_color = '#e9ffdb'  # nyanza
+
+    def execute(self, context):
+        now = datetime.datetime.now()
+        left_window = context['dag'].following_schedule(
+            context['execution_date'])
+        right_window = context['dag'].following_schedule(left_window)
+        logging.info(
+            'Checking latest only with left_window: %s right_window: %s '
+            'now: %s', left_window, right_window, now)
+        if not left_window < now <= right_window:
+            logging.info('Not latest execution, skipping downstream.')
+            session = settings.Session()
+            for task in context['task'].downstream_list:
+                ti = TaskInstance(
+                    task, execution_date=context['ti'].execution_date)
+                logging.info('Skipping task: %s', ti.task_id)
+                ti.state = State.SKIPPED
+                ti.start_date = now
+                ti.end_date = now
+                session.merge(ti)
+            session.commit()
+            session.close()
+            logging.info('Done.')
+        else:
+            logging.info('Latest, allowing execution to proceed.')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/edf033be/docs/concepts.rst
----------------------------------------------------------------------
diff --git a/docs/concepts.rst b/docs/concepts.rst
index 8cfc8ab..82d5248 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -594,6 +594,80 @@ that, when set to ``True``, keeps a task from getting triggered if the
 previous schedule for the task hasn't succeeded.
 
 
+Latest Run Only
+===============
+
+Standard workflow behavior involves running a series of tasks for a
+particular date/time range. Some workflows, however, perform tasks that
+are independent of run time but need to be run on a schedule, much like a
+standard cron job. In these cases, backfills or running jobs missed during
+a pause just wastes CPU cycles.
+
+For situations like this, you can use the ``LatestOnlyOperator`` to skip
+tasks that are not being run during the most recent scheduled run for a
+DAG. The ``LatestOnlyOperator`` skips all immediate downstream tasks, and
+itself, if the time right now is not between its ``execution_time`` and the
+next scheduled ``execution_time``.
+
+One must be aware of the interaction between skipped tasks and trigger
+rules. Skipped tasks will cascade through trigger rules ``all_success``
+and ``all_failed`` but not ``all_done``, ``one_failed``, ``one_success``,
+and ``dummy``. If you would like to use the ``LatestOnlyOperator`` with
+trigger rules that do not cascade skips, you will need to ensure that the
+``LatestOnlyOperator`` is **directly** upstream of the task you would like
+to skip.
+
+It is possible, through use of trigger rules to mix tasks that should run
+in the typical date/time dependent mode and those using the
+``LatestOnlyOperator``.
+
+For example, consider the following dag:
+
+.. code:: python
+
+  #dags/latest_only_with_trigger.py
+  import datetime as dt
+
+  from airflow.models import DAG
+  from airflow.operators.dummy_operator import DummyOperator
+  from airflow.operators.latest_only_operator import LatestOnlyOperator
+  from airflow.utils.trigger_rule import TriggerRule
+
+
+  dag = DAG(
+      dag_id='latest_only_with_trigger',
+      schedule_interval=dt.timedelta(hours=4),
+      start_date=dt.datetime(2016, 9, 20),
+  )
+
+  latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
+
+  task1 = DummyOperator(task_id='task1', dag=dag)
+  task1.set_upstream(latest_only)
+
+  task2 = DummyOperator(task_id='task2', dag=dag)
+
+  task3 = DummyOperator(task_id='task3', dag=dag)
+  task3.set_upstream([task1, task2])
+
+  task4 = DummyOperator(task_id='task4', dag=dag,
+                        trigger_rule=TriggerRule.ALL_DONE)
+  task4.set_upstream([task1, task2])
+
+In the case of this dag, the ``latest_only`` task will show up as skipped
+for all runs except the latest run. ``task1`` is directly downstream of
+``latest_only`` and will also skip for all runs except the latest.
+``task2`` is entirely independent of ``latest_only`` and will run in all
+scheduled periods. ``task3`` is downstream of ``task1`` and ``task2`` and
+because of the default ``trigger_rule`` being ``all_success`` will receive
+a cascaded skip from ``task1``. ``task4`` is downstream of ``task1`` and
+``task2`` but since its ``trigger_rule`` is set to ``all_done`` it will
+trigger as soon as ``task1`` has been skipped (a valid completion state)
+and ``task2`` has succeeded.
+
+.. image:: img/latest_only_with_trigger.png
+
+
 Zombies & Undeads
 =================
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/edf033be/docs/img/latest_only_with_trigger.png
----------------------------------------------------------------------
diff --git a/docs/img/latest_only_with_trigger.png b/docs/img/latest_only_with_trigger.png
new file mode 100644
index 0000000..629adfa
Binary files /dev/null and b/docs/img/latest_only_with_trigger.png differ

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/edf033be/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 2255734..a63c1be 100644
--- a/setup.py
+++ b/setup.py
@@ -159,7 +159,7 @@ qds = ['qds-sdk>=1.9.0']
 cloudant = ['cloudant>=0.5.9,<2.0'] # major update coming soon, clamp to 0.x
 
 all_dbs = postgres + mysql + hive + mssql + hdfs + vertica + cloudant
-devel = ['lxml>=3.3.4', 'nose', 'nose-parameterized', 'mock', 'click', 'jira', 'moto']
+devel = ['lxml>=3.3.4', 'nose', 'nose-parameterized', 'mock', 'click', 'jira', 'moto', 'freezegun']
 devel_minreq = devel + mysql + doc + password + s3
 devel_hadoop = devel_minreq + hive + hdfs + webhdfs + kerberos
 devel_all = devel + all_dbs + doc + samba + s3 + slack + crypto + oracle + docker

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/edf033be/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index e443a03..cffdc1f 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -60,7 +60,7 @@ from airflow.configuration import AirflowConfigException
 
 import six
 
-NUM_EXAMPLE_DAGS = 16
+NUM_EXAMPLE_DAGS = 18
 DEV_NULL = '/dev/null'
 TEST_DAG_FOLDER = os.path.join(
     os.path.dirname(os.path.realpath(__file__)), 'dags')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/edf033be/tests/operators/latest_only_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/latest_only_operator.py b/tests/operators/latest_only_operator.py
new file mode 100644
index 0000000..37aec38
--- /dev/null
+++ b/tests/operators/latest_only_operator.py
@@ -0,0 +1,93 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import print_function, unicode_literals
+
+import datetime
+import logging
+import unittest
+
+from airflow import configuration, DAG, settings
+from airflow.jobs import BackfillJob
+from airflow.models import TaskInstance
+from airflow.operators.latest_only_operator import LatestOnlyOperator
+from airflow.operators.dummy_operator import DummyOperator
+from freezegun import freeze_time
+
+DEFAULT_DATE = datetime.datetime(2016, 1, 1)
+END_DATE = datetime.datetime(2016, 1, 2)
+INTERVAL = datetime.timedelta(hours=12)
+FROZEN_NOW = datetime.datetime(2016, 1, 2, 12, 1, 1)
+
+
+def get_task_instances(task_id):
+    session = settings.Session()
+    return session \
+        .query(TaskInstance) \
+        .filter(TaskInstance.task_id == task_id) \
+        .order_by(TaskInstance.execution_date) \
+        .all()
+
+
+class LatestOnlyOperatorTest(unittest.TestCase):
+
+    def setUp(self):
+        super(LatestOnlyOperatorTest, self).setUp()
+        configuration.load_test_config()
+        self.dag = DAG(
+            'test_dag',
+            default_args={
+                'owner': 'airflow',
+                'start_date': DEFAULT_DATE},
+            schedule_interval=INTERVAL)
+        self.addCleanup(self.dag.clear)
+        freezer = freeze_time(FROZEN_NOW)
+        freezer.start()
+        self.addCleanup(freezer.stop)
+
+    def test_run(self):
+        task = LatestOnlyOperator(
+            task_id='latest',
+            dag=self.dag)
+        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+    def test_skipping(self):
+        latest_task = LatestOnlyOperator(
+            task_id='latest',
+            dag=self.dag)
+        downstream_task = DummyOperator(
+            task_id='downstream',
+            dag=self.dag)
+        downstream_task.set_upstream(latest_task)
+
+        latest_task.run(start_date=DEFAULT_DATE, end_date=END_DATE)
+        downstream_task.run(start_date=DEFAULT_DATE, end_date=END_DATE)
+
+        latest_instances = get_task_instances('latest')
+        exec_date_to_latest_state = {
+            ti.execution_date: ti.state for ti in latest_instances}
+        assert exec_date_to_latest_state == {
+            datetime.datetime(2016, 1, 1): 'success',
+            datetime.datetime(2016, 1, 1, 12): 'success',
+            datetime.datetime(2016, 1, 2): 'success',
+        }
+
+        downstream_instances = get_task_instances('downstream')
+        exec_date_to_downstream_state = {
+            ti.execution_date: ti.state for ti in downstream_instances}
+        assert exec_date_to_downstream_state == {
+            datetime.datetime(2016, 1, 1): 'skipped',
+            datetime.datetime(2016, 1, 1, 12): 'skipped',
+            datetime.datetime(2016, 1, 2): 'success',
+        }