You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jl...@apache.org on 2017/02/12 21:06:40 UTC

incubator-airflow git commit: [AIRFLOW-862] Add DaskExecutor

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 94dc7fb0a -> 6e2210278


[AIRFLOW-862] Add DaskExecutor

Adds a DaskExecutor for running Airflow tasks
in Dask clusters.

Closes #2067 from jlowin/dask-executor


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6e221027
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6e221027
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6e221027

Branch: refs/heads/master
Commit: 6e2210278235d42bbc3a60e1e14bbf0f9127b54f
Parents: 94dc7fb
Author: Jeremiah Lowin <jl...@apache.org>
Authored: Sun Feb 12 16:06:31 2017 -0500
Committer: Jeremiah Lowin <jl...@apache.org>
Committed: Sun Feb 12 16:06:31 2017 -0500

----------------------------------------------------------------------
 UPDATING.md                        |   8 ++
 airflow/configuration.py           |  10 +-
 airflow/executors/__init__.py      |  13 ++-
 airflow/executors/dask_executor.py |  78 ++++++++++++++
 docs/configuration.rst             |  35 +++++++
 scripts/ci/requirements.txt        |   1 +
 setup.py                           |   4 +
 tests/__init__.py                  |   1 +
 tests/executor/__init__.py         |  13 ---
 tests/executor/test_executor.py    |  33 ------
 tests/executors/__init__.py        |  15 +++
 tests/executors/dask_executor.py   | 178 ++++++++++++++++++++++++++++++++
 tests/executors/test_executor.py   |  33 ++++++
 tests/jobs.py                      |   2 +-
 14 files changed, 369 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/UPDATING.md
----------------------------------------------------------------------
diff --git a/UPDATING.md b/UPDATING.md
index b0ab212..ba708cd 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -3,6 +3,14 @@
 This file documents any backwards-incompatible changes in Airflow and
 assists people when migrating to a new version.
 
+## Master
+
+### New Features
+
+#### Dask Executor
+
+A new DaskExecutor allows Airflow tasks to be run in Dask Distributed clusters.
+
 ## Airflow 1.8
 
 ### Database

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 6752bdb..cfccbe9 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -114,7 +114,7 @@ encrypt_s3_logs = False
 s3_log_folder =
 
 # The executor class that airflow should use. Choices include
-# SequentialExecutor, LocalExecutor, CeleryExecutor
+# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor
 executor = SequentialExecutor
 
 # The SqlAlchemy connection string to the metadata database.
@@ -333,6 +333,14 @@ flower_port = 5555
 default_queue = default
 
 
+[dask]
+# This section only applies if you are using the DaskExecutor in
+# [core] section above
+
+# The IP address and port of the Dask cluster's scheduler.
+cluster_address = 127.0.0.1:8786
+
+
 [scheduler]
 # Task instances listen for external kill signal (when you clear tasks
 # from the CLI or the UI), this defines the frequency at which they should

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/airflow/executors/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py
index 77f139e..cd78f69 100644
--- a/airflow/executors/__init__.py
+++ b/airflow/executors/__init__.py
@@ -20,11 +20,6 @@ from airflow.executors.base_executor import BaseExecutor
 from airflow.executors.local_executor import LocalExecutor
 from airflow.executors.sequential_executor import SequentialExecutor
 
-try:
-    from airflow.executors.celery_executor import CeleryExecutor
-except:
-    pass
-
 from airflow.exceptions import AirflowException
 
 
@@ -39,10 +34,14 @@ _EXECUTOR = configuration.get('core', 'EXECUTOR')
 
 if _EXECUTOR == 'LocalExecutor':
     DEFAULT_EXECUTOR = LocalExecutor()
-elif _EXECUTOR == 'CeleryExecutor':
-    DEFAULT_EXECUTOR = CeleryExecutor()
 elif _EXECUTOR == 'SequentialExecutor':
     DEFAULT_EXECUTOR = SequentialExecutor()
+elif _EXECUTOR == 'CeleryExecutor':
+    from airflow.executors.celery_executor import CeleryExecutor
+    DEFAULT_EXECUTOR = CeleryExecutor()
+elif _EXECUTOR == 'DaskExecutor':
+    from airflow.executors.dask_executor import DaskExecutor
+    DEFAULT_EXECUTOR = DaskExecutor()
 elif _EXECUTOR == 'MesosExecutor':
     from airflow.contrib.executors.mesos_executor import MesosExecutor
     DEFAULT_EXECUTOR = MesosExecutor()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/airflow/executors/dask_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/dask_executor.py b/airflow/executors/dask_executor.py
new file mode 100644
index 0000000..9aa7426
--- /dev/null
+++ b/airflow/executors/dask_executor.py
@@ -0,0 +1,78 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import distributed
+
+import subprocess
+import warnings
+
+from airflow import configuration
+from airflow.executors.base_executor import BaseExecutor
+from airflow.utils.state import State
+
+
+class DaskExecutor(BaseExecutor):
+    """
+    DaskExecutor submits tasks to a Dask Distributed cluster.
+    """
+    def __init__(self, cluster_address=None):
+        if cluster_address is None:
+            cluster_address = configuration.get('dask', 'cluster_address')
+        if not cluster_address:
+            raise ValueError(
+                'Please provide a Dask cluster address in airflow.cfg')
+        self.cluster_address = cluster_address
+        super(DaskExecutor, self).__init__(parallelism=0)
+
+    def start(self):
+        self.client = distributed.Client(self.cluster_address)
+        self.futures = {}
+
+    def execute_async(self, key, command, queue=None):
+        if queue is not None:
+            warnings.warning(
+                'DaskExecutor does not support queues. All tasks will be run '
+                'in the same cluster')
+
+        def airflow_run():
+            return subprocess.check_call(command, shell=True)
+        future = self.client.submit(airflow_run, pure=False)
+        self.futures[future] = key
+
+    def _process_future(self, future):
+        if future.done():
+            key = self.futures[future]
+            if future.exception():
+                self.change_state(key, State.FAILED)
+                self.logger.error("Failed to execute task: {}".format(
+                    repr(future.exception())))
+            elif future.cancelled():
+                self.change_state(key, State.FAILED)
+                self.logger.error("Failed to execute task")
+            else:
+                self.change_state(key, State.SUCCESS)
+            self.futures.pop(future)
+
+    def sync(self):
+        # make a copy so futures can be popped during iteration
+        for future in self.futures.copy():
+            self._process_future(future)
+
+    def end(self):
+        for future in distributed.as_completed(self.futures):
+            self._process_future(future)
+
+    def terminate(self):
+        self.client.cancel(self.futures.keys())
+        self.end()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/docs/configuration.rst
----------------------------------------------------------------------
diff --git a/docs/configuration.rst b/docs/configuration.rst
index c4a3442..5ff4284 100644
--- a/docs/configuration.rst
+++ b/docs/configuration.rst
@@ -131,6 +131,41 @@ to monitor your workers. You can use the shortcut command ``airflow flower``
 to start a Flower web server.
 
 
+Scaling Out with Dask
+'''''''''''''''''''''
+
+``DaskExecutor`` allows you to run Airflow tasks in a Dask Distributed cluster.
+
+Dask clusters can be run on a single machine or on remote networks. For complete
+details, consult the `Distributed documentation <https://distributed.readthedocs.io/>`_.
+
+To create a cluster, first start a Scheduler:
+
+.. code-block:: bash
+
+    # default settings for a local cluster
+    DASK_HOST=127.0.0.1
+    DASK_PORT=8786
+
+    dask-scheduler --host $DASK_HOST --port $DASK_PORT
+
+Next start at least one Worker on any machine that can connect to the host:
+
+.. code-block:: bash
+
+    dask-worker $DASK_HOST:$DASK_PORT
+
+Edit your ``airflow.cfg`` to set your executor to ``DaskExecutor`` and provide
+the Dask Scheduler address in the ``[dask]`` section.
+
+Please note:
+
+- Each Dask worker must be able to import Airflow and any dependencies you
+  require.
+- Dask does not support queues. If an Airflow task was created with a queue, a
+  warning will be raised but the task will be submitted to the cluster.
+
+
 Logs
 ''''
 Users can specify a logs folder in ``airflow.cfg``. By default, it is in

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/scripts/ci/requirements.txt
----------------------------------------------------------------------
diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt
index a5786f6..d969572 100644
--- a/scripts/ci/requirements.txt
+++ b/scripts/ci/requirements.txt
@@ -10,6 +10,7 @@ coveralls
 croniter
 cryptography
 dill
+distributed
 docker-py
 filechunkio
 flake8

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index c644eed..a35e71b 100644
--- a/setup.py
+++ b/setup.py
@@ -112,6 +112,9 @@ cgroups = [
     'cgroupspy>=0.1.4',
 ]
 crypto = ['cryptography>=0.9.3']
+dask = [
+    'distributed>=1.15.2, <2'
+    ]
 datadog = ['datadog>=0.14.0']
 doc = [
     'sphinx>=1.2.3',
@@ -233,6 +236,7 @@ def do_setup():
             'cgroups': cgroups,
             'cloudant': cloudant,
             'crypto': crypto,
+            'dask': dask,
             'datadog': datadog,
             'devel': devel_minreq,
             'devel_hadoop': devel_hadoop,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/tests/__init__.py
----------------------------------------------------------------------
diff --git a/tests/__init__.py b/tests/__init__.py
index 7ddf22d..0c0a01b 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -18,6 +18,7 @@ from .api import *
 from .configuration import *
 from .contrib import *
 from .core import *
+from .executors import *
 from .jobs import *
 from .impersonation import *
 from .models import *

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/tests/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/executor/__init__.py b/tests/executor/__init__.py
deleted file mode 100644
index a85b772..0000000
--- a/tests/executor/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/tests/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/executor/test_executor.py b/tests/executor/test_executor.py
deleted file mode 100644
index 2015d9c..0000000
--- a/tests/executor/test_executor.py
+++ /dev/null
@@ -1,33 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from airflow.executors.base_executor import BaseExecutor
-
-
-class TestExecutor(BaseExecutor):
-    """
-    TestExecutor is used for unit testing purposes.
-    """
-    def execute_async(self, key, command, queue=None):
-        self.logger.debug("{} running task instances".format(len(self.running)))
-        self.logger.debug("{} in queue".format(len(self.queued_tasks)))
-
-    def heartbeat(self):
-        pass
-
-    def terminate(self):
-        pass
-
-    def end(self):
-        self.sync()
-

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/tests/executors/__init__.py
----------------------------------------------------------------------
diff --git a/tests/executors/__init__.py b/tests/executors/__init__.py
new file mode 100644
index 0000000..f694969
--- /dev/null
+++ b/tests/executors/__init__.py
@@ -0,0 +1,15 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .dask_executor import *

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/tests/executors/dask_executor.py
----------------------------------------------------------------------
diff --git a/tests/executors/dask_executor.py b/tests/executors/dask_executor.py
new file mode 100644
index 0000000..deeb1bd
--- /dev/null
+++ b/tests/executors/dask_executor.py
@@ -0,0 +1,178 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import logging
+import time
+import unittest
+
+from airflow import configuration
+from airflow.models import DAG, DagBag, TaskInstance, State
+from airflow.jobs import BackfillJob
+from airflow.operators.python_operator import PythonOperator
+
+try:
+    from airflow.executors import DaskExecutor
+    from distributed import LocalCluster
+    SKIP_DASK = False
+except ImportError:
+    logging.error('Dask unavailable, skipping DaskExecutor tests')
+    SKIP_DASK = True
+
+if 'sqlite' in configuration.get('core', 'sql_alchemy_conn'):
+    logging.error('sqlite does not support concurrent access')
+    SKIP_DASK = True
+
+DEFAULT_DATE = datetime.datetime(2017, 1, 1)
+
+
+class DaskExecutorTest(unittest.TestCase):
+
+    def setUp(self):
+        self.dagbag = DagBag(include_examples=True)
+
+    @unittest.skipIf(SKIP_DASK, 'Dask unsupported by this configuration')
+    def test_dask_executor_functions(self):
+        cluster = LocalCluster(nanny=False)
+
+        executor = DaskExecutor(cluster_address=cluster.scheduler_address)
+
+        success_command = 'echo 1'
+        fail_command = 'exit 1'
+
+        executor.execute_async(key='success', command=success_command)
+        executor.execute_async(key='fail', command=fail_command)
+
+        success_future = next(
+            k for k, v in executor.futures.items() if v == 'success')
+        fail_future = next(
+            k for k, v in executor.futures.items() if v == 'fail')
+
+        # wait for the futures to execute, with a timeout
+        timeout = datetime.datetime.now() + datetime.timedelta(seconds=0.5)
+        while not (success_future.done() and fail_future.done()):
+            if datetime.datetime.now() > timeout:
+                raise ValueError(
+                    'The futures should have finished; there is probably '
+                    'an error communciating with the Dask cluster.')
+
+        # both tasks should have finished
+        self.assertTrue(success_future.done())
+        self.assertTrue(fail_future.done())
+
+        # check task exceptions
+        self.assertTrue(success_future.exception() is None)
+        self.assertTrue(fail_future.exception() is not None)
+
+        # tell the executor to shut down
+        executor.end()
+        self.assertTrue(len(executor.futures) == 0)
+
+        cluster.close()
+
+    @unittest.skipIf(SKIP_DASK, 'Dask unsupported by this configuration')
+    def test_submit_task_instance_to_dask_cluster(self):
+        """
+        Test that the DaskExecutor properly submits tasks to the cluster
+        """
+        cluster = LocalCluster(nanny=False)
+
+        executor = DaskExecutor(cluster_address=cluster.scheduler_address)
+
+        args = dict(
+            start_date=DEFAULT_DATE
+        )
+
+        def fail():
+            raise ValueError('Intentional failure.')
+
+        with DAG('test-dag', default_args=args) as dag:
+            # queue should be allowed, but ignored
+            success_operator = PythonOperator(
+                task_id='success',
+                python_callable=lambda: True,
+                queue='queue')
+
+            fail_operator = PythonOperator(
+                task_id='fail',
+                python_callable=fail)
+
+        success_ti = TaskInstance(
+            success_operator,
+            execution_date=DEFAULT_DATE)
+
+        fail_ti = TaskInstance(
+            fail_operator,
+            execution_date=DEFAULT_DATE)
+
+        # queue the tasks
+        executor.queue_task_instance(success_ti)
+        executor.queue_task_instance(fail_ti)
+
+        # the tasks haven't been submitted to the cluster yet
+        self.assertTrue(len(executor.futures) == 0)
+        # after the heartbeat, they have been submitted
+        executor.heartbeat()
+        self.assertTrue(len(executor.futures) == 2)
+
+        # wait a reasonable amount of time for the tasks to complete
+        for _ in range(2):
+            time.sleep(0.25)
+            executor.heartbeat()
+
+        # check that the futures were completed
+        if len(executor.futures) == 2:
+            raise ValueError('Failed to reach cluster before timeout.')
+        self.assertTrue(len(executor.futures) == 0)
+
+        # check that the taskinstances were updated
+        success_ti.refresh_from_db()
+        self.assertTrue(success_ti.state == State.SUCCESS)
+        fail_ti.refresh_from_db()
+        self.assertTrue(fail_ti.state == State.FAILED)
+
+        cluster.close()
+
+
+    @unittest.skipIf(SKIP_DASK, 'Dask unsupported by this configuration')
+    def test_backfill_integration(self):
+        """
+        Test that DaskExecutor can be used to backfill example dags
+        """
+        cluster = LocalCluster(nanny=False)
+
+        dags = [
+            dag for dag in self.dagbag.dags.values()
+            if dag.dag_id in [
+                'example_bash_operator',
+                # 'example_python_operator',
+            ]
+        ]
+
+        for dag in dags:
+            dag.clear(
+                start_date=DEFAULT_DATE,
+                end_date=DEFAULT_DATE)
+
+        for i, dag in enumerate(sorted(dags, key=lambda d: d.dag_id)):
+            job = BackfillJob(
+                dag=dag,
+                start_date=DEFAULT_DATE,
+                end_date=DEFAULT_DATE,
+                ignore_first_depends_on_past=True,
+                executor=DaskExecutor(
+                    cluster_address=cluster.scheduler_address))
+            job.run()
+
+        cluster.close()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/tests/executors/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/executors/test_executor.py b/tests/executors/test_executor.py
new file mode 100644
index 0000000..2015d9c
--- /dev/null
+++ b/tests/executors/test_executor.py
@@ -0,0 +1,33 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from airflow.executors.base_executor import BaseExecutor
+
+
+class TestExecutor(BaseExecutor):
+    """
+    TestExecutor is used for unit testing purposes.
+    """
+    def execute_async(self, key, command, queue=None):
+        self.logger.debug("{} running task instances".format(len(self.running)))
+        self.logger.debug("{} in queue".format(len(self.queued_tasks)))
+
+    def heartbeat(self):
+        pass
+
+    def terminate(self):
+        pass
+
+    def end(self):
+        self.sync()
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 7f3c285..71470e3 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -37,7 +37,7 @@ from airflow.utils.state import State
 from airflow.utils.timeout import timeout
 from airflow.utils.dag_processing import SimpleDagBag
 from mock import patch
-from tests.executor.test_executor import TestExecutor
+from tests.executors.test_executor import TestExecutor
 
 from airflow import configuration
 configuration.load_test_config()