You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/03/13 02:49:29 UTC

[25/28] incubator-airflow git commit: Make compatible with 1.8

Make compatible with 1.8


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8df046bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8df046bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8df046bf

Branch: refs/heads/v1-8-test
Commit: 8df046bfbec670a253139c83c6174bb88f25ee7f
Parents: 2b26a5d
Author: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Authored: Sun Mar 12 10:11:15 2017 -0700
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 10:11:15 2017 -0700

----------------------------------------------------------------------
 tests/executors/__init__.py      | 13 ++++++++
 tests/executors/test_executor.py | 56 +++++++++++++++++++++++++++++++++++
 2 files changed, 69 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8df046bf/tests/executors/__init__.py
----------------------------------------------------------------------
diff --git a/tests/executors/__init__.py b/tests/executors/__init__.py
new file mode 100644
index 0000000..a85b772
--- /dev/null
+++ b/tests/executors/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8df046bf/tests/executors/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/executors/test_executor.py b/tests/executors/test_executor.py
new file mode 100644
index 0000000..9ec6cd4
--- /dev/null
+++ b/tests/executors/test_executor.py
@@ -0,0 +1,56 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from airflow.executors.base_executor import BaseExecutor
+from airflow.utils.state import State
+
+from airflow import settings
+
+
+class TestExecutor(BaseExecutor):
+    """
+    TestExecutor is used for unit testing purposes.
+    """
+    def __init__(self, do_update=False, *args, **kwargs):
+        self.do_update = do_update
+        self._running = []
+        self.history = []
+
+        super(TestExecutor, self).__init__(*args, **kwargs)
+
+    def execute_async(self, key, command, queue=None):
+        self.logger.debug("{} running task instances".format(len(self.running)))
+        self.logger.debug("{} in queue".format(len(self.queued_tasks)))
+
+    def heartbeat(self):
+        session = settings.Session()
+        if self.do_update:
+            self.history.append(list(self.queued_tasks.values()))
+            while len(self._running) > 0:
+                ti = self._running.pop()
+                ti.set_state(State.SUCCESS, session)
+            for key, val in list(self.queued_tasks.items()):
+                (command, priority, queue, ti) = val
+                ti.set_state(State.RUNNING, session)
+                self._running.append(ti)
+                self.queued_tasks.pop(key)
+
+        session.commit()
+        session.close()
+
+    def terminate(self):
+        pass
+
+    def end(self):
+        self.sync()
+