You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jl...@apache.org on 2017/02/12 21:06:40 UTC
incubator-airflow git commit: [AIRFLOW-862] Add DaskExecutor
Repository: incubator-airflow
Updated Branches:
refs/heads/master 94dc7fb0a -> 6e2210278
[AIRFLOW-862] Add DaskExecutor
Adds a DaskExecutor for running Airflow tasks
in Dask clusters.
Closes #2067 from jlowin/dask-executor
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6e221027
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6e221027
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6e221027
Branch: refs/heads/master
Commit: 6e2210278235d42bbc3a60e1e14bbf0f9127b54f
Parents: 94dc7fb
Author: Jeremiah Lowin <jl...@apache.org>
Authored: Sun Feb 12 16:06:31 2017 -0500
Committer: Jeremiah Lowin <jl...@apache.org>
Committed: Sun Feb 12 16:06:31 2017 -0500
----------------------------------------------------------------------
UPDATING.md | 8 ++
airflow/configuration.py | 10 +-
airflow/executors/__init__.py | 13 ++-
airflow/executors/dask_executor.py | 78 ++++++++++++++
docs/configuration.rst | 35 +++++++
scripts/ci/requirements.txt | 1 +
setup.py | 4 +
tests/__init__.py | 1 +
tests/executor/__init__.py | 13 ---
tests/executor/test_executor.py | 33 ------
tests/executors/__init__.py | 15 +++
tests/executors/dask_executor.py | 178 ++++++++++++++++++++++++++++++++
tests/executors/test_executor.py | 33 ++++++
tests/jobs.py | 2 +-
14 files changed, 369 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/UPDATING.md
----------------------------------------------------------------------
diff --git a/UPDATING.md b/UPDATING.md
index b0ab212..ba708cd 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -3,6 +3,14 @@
This file documents any backwards-incompatible changes in Airflow and
assists people when migrating to a new version.
+## Master
+
+### New Features
+
+#### Dask Executor
+
+A new DaskExecutor allows Airflow tasks to be run in Dask Distributed clusters.
+
## Airflow 1.8
### Database
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 6752bdb..cfccbe9 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -114,7 +114,7 @@ encrypt_s3_logs = False
s3_log_folder =
# The executor class that airflow should use. Choices include
-# SequentialExecutor, LocalExecutor, CeleryExecutor
+# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor
executor = SequentialExecutor
# The SqlAlchemy connection string to the metadata database.
@@ -333,6 +333,14 @@ flower_port = 5555
default_queue = default
+[dask]
+# This section only applies if you are using the DaskExecutor in
+# [core] section above
+
+# The IP address and port of the Dask cluster's scheduler.
+cluster_address = 127.0.0.1:8786
+
+
[scheduler]
# Task instances listen for external kill signal (when you clear tasks
# from the CLI or the UI), this defines the frequency at which they should
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/airflow/executors/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py
index 77f139e..cd78f69 100644
--- a/airflow/executors/__init__.py
+++ b/airflow/executors/__init__.py
@@ -20,11 +20,6 @@ from airflow.executors.base_executor import BaseExecutor
from airflow.executors.local_executor import LocalExecutor
from airflow.executors.sequential_executor import SequentialExecutor
-try:
- from airflow.executors.celery_executor import CeleryExecutor
-except:
- pass
-
from airflow.exceptions import AirflowException
@@ -39,10 +34,14 @@ _EXECUTOR = configuration.get('core', 'EXECUTOR')
if _EXECUTOR == 'LocalExecutor':
DEFAULT_EXECUTOR = LocalExecutor()
-elif _EXECUTOR == 'CeleryExecutor':
- DEFAULT_EXECUTOR = CeleryExecutor()
elif _EXECUTOR == 'SequentialExecutor':
DEFAULT_EXECUTOR = SequentialExecutor()
+elif _EXECUTOR == 'CeleryExecutor':
+ from airflow.executors.celery_executor import CeleryExecutor
+ DEFAULT_EXECUTOR = CeleryExecutor()
+elif _EXECUTOR == 'DaskExecutor':
+ from airflow.executors.dask_executor import DaskExecutor
+ DEFAULT_EXECUTOR = DaskExecutor()
elif _EXECUTOR == 'MesosExecutor':
from airflow.contrib.executors.mesos_executor import MesosExecutor
DEFAULT_EXECUTOR = MesosExecutor()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/airflow/executors/dask_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/dask_executor.py b/airflow/executors/dask_executor.py
new file mode 100644
index 0000000..9aa7426
--- /dev/null
+++ b/airflow/executors/dask_executor.py
@@ -0,0 +1,78 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import distributed
+
+import subprocess
+import warnings
+
+from airflow import configuration
+from airflow.executors.base_executor import BaseExecutor
+from airflow.utils.state import State
+
+
+class DaskExecutor(BaseExecutor):
+ """
+ DaskExecutor submits tasks to a Dask Distributed cluster.
+ """
+ def __init__(self, cluster_address=None):
+ if cluster_address is None:
+ cluster_address = configuration.get('dask', 'cluster_address')
+ if not cluster_address:
+ raise ValueError(
+ 'Please provide a Dask cluster address in airflow.cfg')
+ self.cluster_address = cluster_address
+ super(DaskExecutor, self).__init__(parallelism=0)
+
+ def start(self):
+ self.client = distributed.Client(self.cluster_address)
+ self.futures = {}
+
+ def execute_async(self, key, command, queue=None):
+ if queue is not None:
+ warnings.warning(
+ 'DaskExecutor does not support queues. All tasks will be run '
+ 'in the same cluster')
+
+ def airflow_run():
+ return subprocess.check_call(command, shell=True)
+ future = self.client.submit(airflow_run, pure=False)
+ self.futures[future] = key
+
+ def _process_future(self, future):
+ if future.done():
+ key = self.futures[future]
+ if future.exception():
+ self.change_state(key, State.FAILED)
+ self.logger.error("Failed to execute task: {}".format(
+ repr(future.exception())))
+ elif future.cancelled():
+ self.change_state(key, State.FAILED)
+ self.logger.error("Failed to execute task")
+ else:
+ self.change_state(key, State.SUCCESS)
+ self.futures.pop(future)
+
+ def sync(self):
+ # make a copy so futures can be popped during iteration
+ for future in self.futures.copy():
+ self._process_future(future)
+
+ def end(self):
+ for future in distributed.as_completed(self.futures):
+ self._process_future(future)
+
+ def terminate(self):
+ self.client.cancel(self.futures.keys())
+ self.end()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/docs/configuration.rst
----------------------------------------------------------------------
diff --git a/docs/configuration.rst b/docs/configuration.rst
index c4a3442..5ff4284 100644
--- a/docs/configuration.rst
+++ b/docs/configuration.rst
@@ -131,6 +131,41 @@ to monitor your workers. You can use the shortcut command ``airflow flower``
to start a Flower web server.
+Scaling Out with Dask
+'''''''''''''''''''''
+
+``DaskExecutor`` allows you to run Airflow tasks in a Dask Distributed cluster.
+
+Dask clusters can be run on a single machine or on remote networks. For complete
+details, consult the `Distributed documentation <https://distributed.readthedocs.io/>`_.
+
+To create a cluster, first start a Scheduler:
+
+.. code-block:: bash
+
+ # default settings for a local cluster
+ DASK_HOST=127.0.0.1
+ DASK_PORT=8786
+
+ dask-scheduler --host $DASK_HOST --port $DASK_PORT
+
+Next start at least one Worker on any machine that can connect to the host:
+
+.. code-block:: bash
+
+ dask-worker $DASK_HOST:$DASK_PORT
+
+Edit your ``airflow.cfg`` to set your executor to ``DaskExecutor`` and provide
+the Dask Scheduler address in the ``[dask]`` section.
+
+Please note:
+
+- Each Dask worker must be able to import Airflow and any dependencies you
+ require.
+- Dask does not support queues. If an Airflow task was created with a queue, a
+ warning will be raised but the task will be submitted to the cluster.
+
+
Logs
''''
Users can specify a logs folder in ``airflow.cfg``. By default, it is in
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/scripts/ci/requirements.txt
----------------------------------------------------------------------
diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt
index a5786f6..d969572 100644
--- a/scripts/ci/requirements.txt
+++ b/scripts/ci/requirements.txt
@@ -10,6 +10,7 @@ coveralls
croniter
cryptography
dill
+distributed
docker-py
filechunkio
flake8
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index c644eed..a35e71b 100644
--- a/setup.py
+++ b/setup.py
@@ -112,6 +112,9 @@ cgroups = [
'cgroupspy>=0.1.4',
]
crypto = ['cryptography>=0.9.3']
+dask = [
+ 'distributed>=1.15.2, <2'
+ ]
datadog = ['datadog>=0.14.0']
doc = [
'sphinx>=1.2.3',
@@ -233,6 +236,7 @@ def do_setup():
'cgroups': cgroups,
'cloudant': cloudant,
'crypto': crypto,
+ 'dask': dask,
'datadog': datadog,
'devel': devel_minreq,
'devel_hadoop': devel_hadoop,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/tests/__init__.py
----------------------------------------------------------------------
diff --git a/tests/__init__.py b/tests/__init__.py
index 7ddf22d..0c0a01b 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -18,6 +18,7 @@ from .api import *
from .configuration import *
from .contrib import *
from .core import *
+from .executors import *
from .jobs import *
from .impersonation import *
from .models import *
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/tests/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/executor/__init__.py b/tests/executor/__init__.py
deleted file mode 100644
index a85b772..0000000
--- a/tests/executor/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/tests/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/executor/test_executor.py b/tests/executor/test_executor.py
deleted file mode 100644
index 2015d9c..0000000
--- a/tests/executor/test_executor.py
+++ /dev/null
@@ -1,33 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from airflow.executors.base_executor import BaseExecutor
-
-
-class TestExecutor(BaseExecutor):
- """
- TestExecutor is used for unit testing purposes.
- """
- def execute_async(self, key, command, queue=None):
- self.logger.debug("{} running task instances".format(len(self.running)))
- self.logger.debug("{} in queue".format(len(self.queued_tasks)))
-
- def heartbeat(self):
- pass
-
- def terminate(self):
- pass
-
- def end(self):
- self.sync()
-
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/tests/executors/__init__.py
----------------------------------------------------------------------
diff --git a/tests/executors/__init__.py b/tests/executors/__init__.py
new file mode 100644
index 0000000..f694969
--- /dev/null
+++ b/tests/executors/__init__.py
@@ -0,0 +1,15 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .dask_executor import *
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/tests/executors/dask_executor.py
----------------------------------------------------------------------
diff --git a/tests/executors/dask_executor.py b/tests/executors/dask_executor.py
new file mode 100644
index 0000000..deeb1bd
--- /dev/null
+++ b/tests/executors/dask_executor.py
@@ -0,0 +1,178 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import logging
+import time
+import unittest
+
+from airflow import configuration
+from airflow.models import DAG, DagBag, TaskInstance, State
+from airflow.jobs import BackfillJob
+from airflow.operators.python_operator import PythonOperator
+
+try:
+ from airflow.executors import DaskExecutor
+ from distributed import LocalCluster
+ SKIP_DASK = False
+except ImportError:
+ logging.error('Dask unavailable, skipping DaskExecutor tests')
+ SKIP_DASK = True
+
+if 'sqlite' in configuration.get('core', 'sql_alchemy_conn'):
+ logging.error('sqlite does not support concurrent access')
+ SKIP_DASK = True
+
+DEFAULT_DATE = datetime.datetime(2017, 1, 1)
+
+
+class DaskExecutorTest(unittest.TestCase):
+
+ def setUp(self):
+ self.dagbag = DagBag(include_examples=True)
+
+ @unittest.skipIf(SKIP_DASK, 'Dask unsupported by this configuration')
+ def test_dask_executor_functions(self):
+ cluster = LocalCluster(nanny=False)
+
+ executor = DaskExecutor(cluster_address=cluster.scheduler_address)
+
+ success_command = 'echo 1'
+ fail_command = 'exit 1'
+
+ executor.execute_async(key='success', command=success_command)
+ executor.execute_async(key='fail', command=fail_command)
+
+ success_future = next(
+ k for k, v in executor.futures.items() if v == 'success')
+ fail_future = next(
+ k for k, v in executor.futures.items() if v == 'fail')
+
+ # wait for the futures to execute, with a timeout
+ timeout = datetime.datetime.now() + datetime.timedelta(seconds=0.5)
+ while not (success_future.done() and fail_future.done()):
+ if datetime.datetime.now() > timeout:
+ raise ValueError(
+ 'The futures should have finished; there is probably '
+ 'an error communciating with the Dask cluster.')
+
+ # both tasks should have finished
+ self.assertTrue(success_future.done())
+ self.assertTrue(fail_future.done())
+
+ # check task exceptions
+ self.assertTrue(success_future.exception() is None)
+ self.assertTrue(fail_future.exception() is not None)
+
+ # tell the executor to shut down
+ executor.end()
+ self.assertTrue(len(executor.futures) == 0)
+
+ cluster.close()
+
+ @unittest.skipIf(SKIP_DASK, 'Dask unsupported by this configuration')
+ def test_submit_task_instance_to_dask_cluster(self):
+ """
+ Test that the DaskExecutor properly submits tasks to the cluster
+ """
+ cluster = LocalCluster(nanny=False)
+
+ executor = DaskExecutor(cluster_address=cluster.scheduler_address)
+
+ args = dict(
+ start_date=DEFAULT_DATE
+ )
+
+ def fail():
+ raise ValueError('Intentional failure.')
+
+ with DAG('test-dag', default_args=args) as dag:
+ # queue should be allowed, but ignored
+ success_operator = PythonOperator(
+ task_id='success',
+ python_callable=lambda: True,
+ queue='queue')
+
+ fail_operator = PythonOperator(
+ task_id='fail',
+ python_callable=fail)
+
+ success_ti = TaskInstance(
+ success_operator,
+ execution_date=DEFAULT_DATE)
+
+ fail_ti = TaskInstance(
+ fail_operator,
+ execution_date=DEFAULT_DATE)
+
+ # queue the tasks
+ executor.queue_task_instance(success_ti)
+ executor.queue_task_instance(fail_ti)
+
+ # the tasks haven't been submitted to the cluster yet
+ self.assertTrue(len(executor.futures) == 0)
+ # after the heartbeat, they have been submitted
+ executor.heartbeat()
+ self.assertTrue(len(executor.futures) == 2)
+
+ # wait a reasonable amount of time for the tasks to complete
+ for _ in range(2):
+ time.sleep(0.25)
+ executor.heartbeat()
+
+ # check that the futures were completed
+ if len(executor.futures) == 2:
+ raise ValueError('Failed to reach cluster before timeout.')
+ self.assertTrue(len(executor.futures) == 0)
+
+ # check that the taskinstances were updated
+ success_ti.refresh_from_db()
+ self.assertTrue(success_ti.state == State.SUCCESS)
+ fail_ti.refresh_from_db()
+ self.assertTrue(fail_ti.state == State.FAILED)
+
+ cluster.close()
+
+
+ @unittest.skipIf(SKIP_DASK, 'Dask unsupported by this configuration')
+ def test_backfill_integration(self):
+ """
+ Test that DaskExecutor can be used to backfill example dags
+ """
+ cluster = LocalCluster(nanny=False)
+
+ dags = [
+ dag for dag in self.dagbag.dags.values()
+ if dag.dag_id in [
+ 'example_bash_operator',
+ # 'example_python_operator',
+ ]
+ ]
+
+ for dag in dags:
+ dag.clear(
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE)
+
+ for i, dag in enumerate(sorted(dags, key=lambda d: d.dag_id)):
+ job = BackfillJob(
+ dag=dag,
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE,
+ ignore_first_depends_on_past=True,
+ executor=DaskExecutor(
+ cluster_address=cluster.scheduler_address))
+ job.run()
+
+ cluster.close()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/tests/executors/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/executors/test_executor.py b/tests/executors/test_executor.py
new file mode 100644
index 0000000..2015d9c
--- /dev/null
+++ b/tests/executors/test_executor.py
@@ -0,0 +1,33 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from airflow.executors.base_executor import BaseExecutor
+
+
+class TestExecutor(BaseExecutor):
+ """
+ TestExecutor is used for unit testing purposes.
+ """
+ def execute_async(self, key, command, queue=None):
+ self.logger.debug("{} running task instances".format(len(self.running)))
+ self.logger.debug("{} in queue".format(len(self.queued_tasks)))
+
+ def heartbeat(self):
+ pass
+
+ def terminate(self):
+ pass
+
+ def end(self):
+ self.sync()
+
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e221027/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 7f3c285..71470e3 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -37,7 +37,7 @@ from airflow.utils.state import State
from airflow.utils.timeout import timeout
from airflow.utils.dag_processing import SimpleDagBag
from mock import patch
-from tests.executor.test_executor import TestExecutor
+from tests.executors.test_executor import TestExecutor
from airflow import configuration
configuration.load_test_config()