You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2017/10/17 18:39:27 UTC

incubator-airflow git commit: [AIRFLOW-1631] Fix local executor unbound parallelism

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 707ab6952 -> cdfced324


[AIRFLOW-1631] Fix local executor unbound parallelism

Before, if unlimited parallelism was used passing
`0` for the
parallelism value, the local executor would stall
execution since no
worker was being created, violating the
BaseExecutor contract on the
parallelism option.

Now, if unbound parallelism is used, processes
will be created on demand
for each task submitted for execution.

Closes #2658 from edgarRd/erod-localexecutor-fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/cdfced32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/cdfced32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/cdfced32

Branch: refs/heads/master
Commit: cdfced3248c7f14b639919c093f4f3042deb754b
Parents: 707ab69
Author: Edgar Rodriguez <ed...@airbnb.com>
Authored: Tue Oct 17 11:39:06 2017 -0700
Committer: Dan Davydov <da...@airbnb.com>
Committed: Tue Oct 17 11:39:22 2017 -0700

----------------------------------------------------------------------
 airflow/executors/local_executor.py    | 205 +++++++++++++++++++++++-----
 tests/executors/test_local_executor.py |  74 ++++++++++
 2 files changed, 244 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cdfced32/airflow/executors/local_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py
index f9eceb3..9b4f8e1 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -11,6 +11,33 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+"""
+LocalExecutor runs tasks by spawning processes in a controlled fashion in different
+modes. Given that BaseExecutor has the option to receive a `parallelism` parameter to
+limit the number of process spawned, when this parameter is `0` the number of processes
+that LocalExecutor can spawn is unlimited.
+
+The following strategies are implemented:
+1. Unlimited Parallelism (self.parallelism == 0): In this strategy, LocalExecutor will
+spawn a process every time `execute_async` is called, that is, every task submitted to the
+LocalExecutor will be executed in its own process. Once the task is executed and the
+result stored in the `result_queue`, the process terminates. There is no need for a
+`task_queue` in this approach, since as soon as a task is received a new process will be
+allocated to the task. Processes used in this strategy are of class LocalWorker.
+
+2. Limited Parallelism (self.parallelism > 0): In this strategy, the LocalExecutor spawns
+the number of processes equal to the value of `self.parallelism` at `start` time,
+using a `task_queue` to coordinate the ingestion of tasks and the work distribution among
+the workers, which will take a task as soon as they are ready. During the lifecycle of
+the LocalExecutor, the worker processes are running waiting for tasks, once the
+LocalExecutor receives the call to shutdown the executor a poison token is sent to the
+workers to terminate them. Processes used in this strategy are of class QueuedLocalWorker.
+
+Arguably, `SequentialExecutor` could be thought as a LocalExecutor with limited
+parallelism of just 1 worker, i.e. `self.parallelism = 1`.
+This option could lead to the unification of the executor implementations, running
+locally, into just one `LocalExecutor` with multiple modes.
+"""
 
 import multiprocessing
 import subprocess
@@ -18,20 +45,63 @@ import time
 
 from builtins import range
 
-from airflow import configuration
 from airflow.executors.base_executor import BaseExecutor
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
 
-PARALLELISM = configuration.get('core', 'PARALLELISM')
-
 
 class LocalWorker(multiprocessing.Process, LoggingMixin):
+
+    """LocalWorker Process implementation to run airflow commands. Executes the given
+    command and puts the result into a result queue when done, terminating execution."""
+
+    def __init__(self, result_queue):
+        """
+        :param result_queue: the queue to store result states tuples (key, State)
+        :type result_queue: multiprocessing.Queue
+        """
+        super(LocalWorker, self).__init__()
+        self.daemon = True
+        self.result_queue = result_queue
+        self.key = None
+        self.command = None
+
+    def execute_work(self, key, command):
+        """
+        Executes command received and stores result state in queue.
+        :param key: the key to identify the TI
+        :type key: Tuple(dag_id, task_id, execution_date)
+        :param command: the command to execute
+        :type command: string
+        """
+        if key is None:
+            return
+        self.log.info("%s running %s", self.__class__.__name__, command)
+        command = "exec bash -c '{0}'".format(command)
+        try:
+            subprocess.check_call(command, shell=True)
+            state = State.SUCCESS
+        except subprocess.CalledProcessError as e:
+            state = State.FAILED
+            self.log.error("Failed to execute task %s.", str(e))
+            # TODO: Why is this commented out?
+            # raise e
+        self.result_queue.put((key, state))
+
+    def run(self):
+        self.execute_work(self.key, self.command)
+        time.sleep(1)
+
+
+class QueuedLocalWorker(LocalWorker):
+
+    """LocalWorker implementation that is waiting for tasks from a queue and will
+    continue executing commands as they become available in the queue. It will terminate
+    execution once the poison token is found."""
+
     def __init__(self, task_queue, result_queue):
-        multiprocessing.Process.__init__(self)
+        super(QueuedLocalWorker, self).__init__(result_queue=result_queue)
         self.task_queue = task_queue
-        self.result_queue = result_queue
-        self.daemon = True
 
     def run(self):
         while True:
@@ -40,17 +110,7 @@ class LocalWorker(multiprocessing.Process, LoggingMixin):
                 # Received poison pill, no more tasks to run
                 self.task_queue.task_done()
                 break
-            self.log.info("%s running %s", self.__class__.__name__, command)
-            command = "exec bash -c '{0}'".format(command)
-            try:
-                subprocess.check_call(command, shell=True)
-                state = State.SUCCESS
-            except subprocess.CalledProcessError as e:
-                state = State.FAILED
-                self.log.error("Failed to execute task %s.", str(e))
-                # TODO: Why is this commented out?
-                # raise e
-            self.result_queue.put((key, state))
+            self.execute_work(key, command)
             self.task_queue.task_done()
             time.sleep(1)
 
@@ -62,30 +122,105 @@ class LocalExecutor(BaseExecutor):
     of tasks.
     """
 
+    class _UnlimitedParallelism(object):
+        """Implements LocalExecutor with unlimited parallelism, starting one process
+        per each command to execute."""
+
+        def __init__(self, executor):
+            """
+            :param executor: the executor instance to implement.
+            :type executor: LocalExecutor
+            """
+            self.executor = executor
+
+        def start(self):
+            self.executor.workers_used = 0
+            self.executor.workers_active = 0
+
+        def execute_async(self, key, command):
+            """
+            :param key: the key to identify the TI
+            :type key: Tuple(dag_id, task_id, execution_date)
+            :param command: the command to execute
+            :type command: string
+            """
+            local_worker = LocalWorker(self.executor.result_queue)
+            local_worker.key = key
+            local_worker.command = command
+            self.executor.workers_used += 1
+            self.executor.workers_active += 1
+            local_worker.start()
+
+        def sync(self):
+            while not self.executor.result_queue.empty():
+                results = self.executor.result_queue.get()
+                self.executor.change_state(*results)
+                self.executor.workers_active -= 1
+
+        def end(self):
+            while self.executor.workers_active > 0:
+                self.executor.sync()
+                time.sleep(1)
+
+    class _LimitedParallelism(object):
+        """Implements LocalExecutor with limited parallelism using a task queue to
+        coordinate work distribution."""
+
+        def __init__(self, executor):
+            self.executor = executor
+
+        def start(self):
+            self.executor.queue = multiprocessing.JoinableQueue()
+
+            self.executor.workers = [
+                QueuedLocalWorker(self.executor.queue, self.executor.result_queue)
+                for _ in range(self.executor.parallelism)
+            ]
+
+            self.executor.workers_used = len(self.executor.workers)
+
+            for w in self.executor.workers:
+                w.start()
+
+        def execute_async(self, key, command):
+            """
+            :param key: the key to identify the TI
+            :type key: Tuple(dag_id, task_id, execution_date)
+            :param command: the command to execute
+            :type command: string
+            """
+            self.executor.queue.put((key, command))
+
+        def sync(self):
+            while not self.executor.result_queue.empty():
+                results = self.executor.result_queue.get()
+                self.executor.change_state(*results)
+
+        def end(self):
+            # Sending poison pill to all worker
+            for _ in self.executor.workers:
+                self.executor.queue.put((None, None))
+
+            # Wait for commands to finish
+            self.executor.queue.join()
+            self.executor.sync()
+
     def start(self):
-        self.queue = multiprocessing.JoinableQueue()
         self.result_queue = multiprocessing.Queue()
-        self.workers = [
-            LocalWorker(self.queue, self.result_queue)
-            for _ in range(self.parallelism)
-        ]
+        self.queue = None
+        self.workers = []
+        self.workers_used = 0
+        self.workers_active = 0
+        self.impl = (LocalExecutor._UnlimitedParallelism(self) if self.parallelism == 0
+                     else LocalExecutor._LimitedParallelism(self))
 
-        for w in self.workers:
-            w.start()
+        self.impl.start()
 
     def execute_async(self, key, command, queue=None):
-        self.queue.put((key, command))
+        self.impl.execute_async(key=key, command=command)
 
     def sync(self):
-        while not self.result_queue.empty():
-            results = self.result_queue.get()
-            self.change_state(*results)
+        self.impl.sync()
 
     def end(self):
-        # Sending poison pill to all worker
-        for _ in self.workers:
-            self.queue.put((None, None))
-
-        # Wait for commands to finish
-        self.queue.join()
-        self.sync()
+        self.impl.end()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cdfced32/tests/executors/test_local_executor.py
----------------------------------------------------------------------
diff --git a/tests/executors/test_local_executor.py b/tests/executors/test_local_executor.py
new file mode 100644
index 0000000..bca6354
--- /dev/null
+++ b/tests/executors/test_local_executor.py
@@ -0,0 +1,74 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from airflow.executors.local_executor import LocalExecutor
+from airflow.utils.state import State
+from airflow.utils.timeout import timeout
+
+
+class LocalExecutorTest(unittest.TestCase):
+
+    TEST_SUCCESS_COMMANDS = 5
+
+    def execution_parallelism(self, parallelism=0):
+        executor = LocalExecutor(parallelism=parallelism)
+        executor.start()
+
+        success_key = 'success {}'
+        success_command = 'echo {}'
+        fail_command = 'exit 1'
+
+        for i in range(self.TEST_SUCCESS_COMMANDS):
+            key, command = success_key.format(i), success_command.format(i)
+            executor.execute_async(key=key, command=command)
+            executor.running[key] = True
+
+        # errors are propagated for some reason
+        try:
+            executor.execute_async(key='fail', command=fail_command)
+        except:
+            pass
+
+        executor.running['fail'] = True
+
+        if parallelism == 0:
+            with timeout(seconds=5):
+                executor.end()
+        else:
+            executor.end()
+
+        for i in range(self.TEST_SUCCESS_COMMANDS):
+            key = success_key.format(i)
+            self.assertTrue(executor.event_buffer[key], State.SUCCESS)
+        self.assertTrue(executor.event_buffer['fail'], State.FAILED)
+
+        for i in range(self.TEST_SUCCESS_COMMANDS):
+            self.assertNotIn(success_key.format(i), executor.running)
+        self.assertNotIn('fail', executor.running)
+
+        expected = self.TEST_SUCCESS_COMMANDS + 1 if parallelism == 0 else parallelism
+        self.assertEqual(executor.workers_used, expected)
+
+    def test_execution_unlimited_parallelism(self):
+        self.execution_parallelism(parallelism=0)
+
+    def test_execution_limited_parallelism(self):
+        test_parallelism = 2
+        self.execution_parallelism(parallelism=test_parallelism)
+
+
+if __name__ == '__main__':
+    unittest.main()