You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/02/19 08:30:08 UTC
incubator-airflow git commit: [AIRFLOW-862] Fix Unit Tests for
DaskExecutor
Repository: incubator-airflow
Updated Branches:
refs/heads/master 21d775a9a -> fe7881656
[AIRFLOW-862] Fix Unit Tests for DaskExecutor
Unit tests were inadvertently disabled for
DaskExecutor
Closes #2076 from jlowin/fix-dask-tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/fe788165
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/fe788165
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/fe788165
Branch: refs/heads/master
Commit: fe7881656f3fbea341b91ed98c9cef5513accbc6
Parents: 21d775a
Author: Jeremiah Lowin <jl...@apache.org>
Authored: Sun Feb 19 09:30:01 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sun Feb 19 09:30:01 2017 +0100
----------------------------------------------------------------------
airflow/executors/dask_executor.py | 12 +++---
tests/executors/dask_executor.py | 74 +++------------------------------
2 files changed, 11 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fe788165/airflow/executors/dask_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/dask_executor.py b/airflow/executors/dask_executor.py
index 9aa7426..d65830a 100644
--- a/airflow/executors/dask_executor.py
+++ b/airflow/executors/dask_executor.py
@@ -19,7 +19,6 @@ import warnings
from airflow import configuration
from airflow.executors.base_executor import BaseExecutor
-from airflow.utils.state import State
class DaskExecutor(BaseExecutor):
@@ -41,12 +40,13 @@ class DaskExecutor(BaseExecutor):
def execute_async(self, key, command, queue=None):
if queue is not None:
- warnings.warning(
+ warnings.warn(
'DaskExecutor does not support queues. All tasks will be run '
'in the same cluster')
def airflow_run():
return subprocess.check_call(command, shell=True)
+
future = self.client.submit(airflow_run, pure=False)
self.futures[future] = key
@@ -54,14 +54,14 @@ class DaskExecutor(BaseExecutor):
if future.done():
key = self.futures[future]
if future.exception():
- self.change_state(key, State.FAILED)
+ self.fail(key)
self.logger.error("Failed to execute task: {}".format(
repr(future.exception())))
elif future.cancelled():
- self.change_state(key, State.FAILED)
+ self.fail(key)
self.logger.error("Failed to execute task")
else:
- self.change_state(key, State.SUCCESS)
+ self.success(key)
self.futures.pop(future)
def sync(self):
@@ -70,7 +70,7 @@ class DaskExecutor(BaseExecutor):
self._process_future(future)
def end(self):
- for future in distributed.as_completed(self.futures):
+ for future in distributed.as_completed(self.futures.copy()):
self._process_future(future)
def terminate(self):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fe788165/tests/executors/dask_executor.py
----------------------------------------------------------------------
diff --git a/tests/executors/dask_executor.py b/tests/executors/dask_executor.py
index deeb1bd..51a57f2 100644
--- a/tests/executors/dask_executor.py
+++ b/tests/executors/dask_executor.py
@@ -23,7 +23,7 @@ from airflow.jobs import BackfillJob
from airflow.operators.python_operator import PythonOperator
try:
- from airflow.executors import DaskExecutor
+ from airflow.executors.dask_executor import DaskExecutor
from distributed import LocalCluster
SKIP_DASK = False
except ImportError:
@@ -48,6 +48,9 @@ class DaskExecutorTest(unittest.TestCase):
executor = DaskExecutor(cluster_address=cluster.scheduler_address)
+ # start the executor
+ executor.start()
+
success_command = 'echo 1'
fail_command = 'exit 1'
@@ -60,7 +63,7 @@ class DaskExecutorTest(unittest.TestCase):
k for k, v in executor.futures.items() if v == 'fail')
# wait for the futures to execute, with a timeout
- timeout = datetime.datetime.now() + datetime.timedelta(seconds=0.5)
+ timeout = datetime.datetime.now() + datetime.timedelta(seconds=30)
while not (success_future.done() and fail_future.done()):
if datetime.datetime.now() > timeout:
raise ValueError(
@@ -75,73 +78,6 @@ class DaskExecutorTest(unittest.TestCase):
self.assertTrue(success_future.exception() is None)
self.assertTrue(fail_future.exception() is not None)
- # tell the executor to shut down
- executor.end()
- self.assertTrue(len(executor.futures) == 0)
-
- cluster.close()
-
- @unittest.skipIf(SKIP_DASK, 'Dask unsupported by this configuration')
- def test_submit_task_instance_to_dask_cluster(self):
- """
- Test that the DaskExecutor properly submits tasks to the cluster
- """
- cluster = LocalCluster(nanny=False)
-
- executor = DaskExecutor(cluster_address=cluster.scheduler_address)
-
- args = dict(
- start_date=DEFAULT_DATE
- )
-
- def fail():
- raise ValueError('Intentional failure.')
-
- with DAG('test-dag', default_args=args) as dag:
- # queue should be allowed, but ignored
- success_operator = PythonOperator(
- task_id='success',
- python_callable=lambda: True,
- queue='queue')
-
- fail_operator = PythonOperator(
- task_id='fail',
- python_callable=fail)
-
- success_ti = TaskInstance(
- success_operator,
- execution_date=DEFAULT_DATE)
-
- fail_ti = TaskInstance(
- fail_operator,
- execution_date=DEFAULT_DATE)
-
- # queue the tasks
- executor.queue_task_instance(success_ti)
- executor.queue_task_instance(fail_ti)
-
- # the tasks haven't been submitted to the cluster yet
- self.assertTrue(len(executor.futures) == 0)
- # after the heartbeat, they have been submitted
- executor.heartbeat()
- self.assertTrue(len(executor.futures) == 2)
-
- # wait a reasonable amount of time for the tasks to complete
- for _ in range(2):
- time.sleep(0.25)
- executor.heartbeat()
-
- # check that the futures were completed
- if len(executor.futures) == 2:
- raise ValueError('Failed to reach cluster before timeout.')
- self.assertTrue(len(executor.futures) == 0)
-
- # check that the taskinstances were updated
- success_ti.refresh_from_db()
- self.assertTrue(success_ti.state == State.SUCCESS)
- fail_ti.refresh_from_db()
- self.assertTrue(fail_ti.state == State.FAILED)
-
cluster.close()