You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/02/19 08:30:08 UTC

incubator-airflow git commit: [AIRFLOW-862] Fix Unit Tests for DaskExecutor

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 21d775a9a -> fe7881656


[AIRFLOW-862] Fix Unit Tests for DaskExecutor

Unit tests were inadvertently disabled for
DaskExecutor

Closes #2076 from jlowin/fix-dask-tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/fe788165
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/fe788165
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/fe788165

Branch: refs/heads/master
Commit: fe7881656f3fbea341b91ed98c9cef5513accbc6
Parents: 21d775a
Author: Jeremiah Lowin <jl...@apache.org>
Authored: Sun Feb 19 09:30:01 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sun Feb 19 09:30:01 2017 +0100

----------------------------------------------------------------------
 airflow/executors/dask_executor.py | 12 +++---
 tests/executors/dask_executor.py   | 74 +++------------------------------
 2 files changed, 11 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fe788165/airflow/executors/dask_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/dask_executor.py b/airflow/executors/dask_executor.py
index 9aa7426..d65830a 100644
--- a/airflow/executors/dask_executor.py
+++ b/airflow/executors/dask_executor.py
@@ -19,7 +19,6 @@ import warnings
 
 from airflow import configuration
 from airflow.executors.base_executor import BaseExecutor
-from airflow.utils.state import State
 
 
 class DaskExecutor(BaseExecutor):
@@ -41,12 +40,13 @@ class DaskExecutor(BaseExecutor):
 
     def execute_async(self, key, command, queue=None):
         if queue is not None:
-            warnings.warning(
+            warnings.warn(
                 'DaskExecutor does not support queues. All tasks will be run '
                 'in the same cluster')
 
         def airflow_run():
             return subprocess.check_call(command, shell=True)
+
         future = self.client.submit(airflow_run, pure=False)
         self.futures[future] = key
 
@@ -54,14 +54,14 @@ class DaskExecutor(BaseExecutor):
         if future.done():
             key = self.futures[future]
             if future.exception():
-                self.change_state(key, State.FAILED)
+                self.fail(key)
                 self.logger.error("Failed to execute task: {}".format(
                     repr(future.exception())))
             elif future.cancelled():
-                self.change_state(key, State.FAILED)
+                self.fail(key)
                 self.logger.error("Failed to execute task")
             else:
-                self.change_state(key, State.SUCCESS)
+                self.success(key)
             self.futures.pop(future)
 
     def sync(self):
@@ -70,7 +70,7 @@ class DaskExecutor(BaseExecutor):
             self._process_future(future)
 
     def end(self):
-        for future in distributed.as_completed(self.futures):
+        for future in distributed.as_completed(self.futures.copy()):
             self._process_future(future)
 
     def terminate(self):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fe788165/tests/executors/dask_executor.py
----------------------------------------------------------------------
diff --git a/tests/executors/dask_executor.py b/tests/executors/dask_executor.py
index deeb1bd..51a57f2 100644
--- a/tests/executors/dask_executor.py
+++ b/tests/executors/dask_executor.py
@@ -23,7 +23,7 @@ from airflow.jobs import BackfillJob
 from airflow.operators.python_operator import PythonOperator
 
 try:
-    from airflow.executors import DaskExecutor
+    from airflow.executors.dask_executor import DaskExecutor
     from distributed import LocalCluster
     SKIP_DASK = False
 except ImportError:
@@ -48,6 +48,9 @@ class DaskExecutorTest(unittest.TestCase):
 
         executor = DaskExecutor(cluster_address=cluster.scheduler_address)
 
+        # start the executor
+        executor.start()
+
         success_command = 'echo 1'
         fail_command = 'exit 1'
 
@@ -60,7 +63,7 @@ class DaskExecutorTest(unittest.TestCase):
             k for k, v in executor.futures.items() if v == 'fail')
 
         # wait for the futures to execute, with a timeout
-        timeout = datetime.datetime.now() + datetime.timedelta(seconds=0.5)
+        timeout = datetime.datetime.now() + datetime.timedelta(seconds=30)
         while not (success_future.done() and fail_future.done()):
             if datetime.datetime.now() > timeout:
                 raise ValueError(
@@ -75,73 +78,6 @@ class DaskExecutorTest(unittest.TestCase):
         self.assertTrue(success_future.exception() is None)
         self.assertTrue(fail_future.exception() is not None)
 
-        # tell the executor to shut down
-        executor.end()
-        self.assertTrue(len(executor.futures) == 0)
-
-        cluster.close()
-
-    @unittest.skipIf(SKIP_DASK, 'Dask unsupported by this configuration')
-    def test_submit_task_instance_to_dask_cluster(self):
-        """
-        Test that the DaskExecutor properly submits tasks to the cluster
-        """
-        cluster = LocalCluster(nanny=False)
-
-        executor = DaskExecutor(cluster_address=cluster.scheduler_address)
-
-        args = dict(
-            start_date=DEFAULT_DATE
-        )
-
-        def fail():
-            raise ValueError('Intentional failure.')
-
-        with DAG('test-dag', default_args=args) as dag:
-            # queue should be allowed, but ignored
-            success_operator = PythonOperator(
-                task_id='success',
-                python_callable=lambda: True,
-                queue='queue')
-
-            fail_operator = PythonOperator(
-                task_id='fail',
-                python_callable=fail)
-
-        success_ti = TaskInstance(
-            success_operator,
-            execution_date=DEFAULT_DATE)
-
-        fail_ti = TaskInstance(
-            fail_operator,
-            execution_date=DEFAULT_DATE)
-
-        # queue the tasks
-        executor.queue_task_instance(success_ti)
-        executor.queue_task_instance(fail_ti)
-
-        # the tasks haven't been submitted to the cluster yet
-        self.assertTrue(len(executor.futures) == 0)
-        # after the heartbeat, they have been submitted
-        executor.heartbeat()
-        self.assertTrue(len(executor.futures) == 2)
-
-        # wait a reasonable amount of time for the tasks to complete
-        for _ in range(2):
-            time.sleep(0.25)
-            executor.heartbeat()
-
-        # check that the futures were completed
-        if len(executor.futures) == 2:
-            raise ValueError('Failed to reach cluster before timeout.')
-        self.assertTrue(len(executor.futures) == 0)
-
-        # check that the taskinstances were updated
-        success_ti.refresh_from_db()
-        self.assertTrue(success_ti.state == State.SUCCESS)
-        fail_ti.refresh_from_db()
-        self.assertTrue(fail_ti.state == State.FAILED)
-
         cluster.close()