You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2017/12/29 18:21:08 UTC

incubator-airflow git commit: [AIRFLOW-1470] Implement BashSensor operator

Repository: incubator-airflow
Updated Branches:
  refs/heads/master d44a82e59 -> b7c2f7169


[AIRFLOW-1470] Implement BashSensor operator

This sensor succeeds once a bash command/script
returns 0, and keeps poking otherwise. The
implementation is very similar to BashOperator.

Closes #2489 from diogoalexandrefranco/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b7c2f716
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b7c2f716
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b7c2f716

Branch: refs/heads/master
Commit: b7c2f7169b44135349d42b0ddcd6805f7f9c4d6a
Parents: d44a82e
Author: Diogo Franco <di...@farfetch.com>
Authored: Fri Dec 29 19:21:01 2017 +0100
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Fri Dec 29 19:21:01 2017 +0100

----------------------------------------------------------------------
 airflow/contrib/sensors/bash_sensor.py    | 91 ++++++++++++++++++++++++++
 docs/code.rst                             |  1 +
 tests/contrib/sensors/test_bash_sensor.py | 55 ++++++++++++++++
 3 files changed, 147 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b7c2f716/airflow/contrib/sensors/bash_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/bash_sensor.py b/airflow/contrib/sensors/bash_sensor.py
new file mode 100644
index 0000000..760ee2c
--- /dev/null
+++ b/airflow/contrib/sensors/bash_sensor.py
@@ -0,0 +1,91 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from builtins import bytes
+import os
+from subprocess import Popen, STDOUT, PIPE
+from tempfile import gettempdir, NamedTemporaryFile
+from airflow.utils.decorators import apply_defaults
+from airflow.operators.sensors import BaseSensorOperator
+from airflow.utils.file import TemporaryDirectory
+
+
+class BashSensor(BaseSensorOperator):
+    """
+    Executes a bash command/script and returns True if and only if the
+    return code is 0.
+
+    :param bash_command: The command, set of commands or reference to a
+        bash script (must be '.sh') to be executed.
+    :type bash_command: string
+
+    :param env: If env is not None, it must be a mapping that defines the
+        environment variables for the new process; these are used instead
+        of inheriting the current process environment, which is the default
+        behavior. (templated)
+    :type env: dict
+    :param output_encoding: output encoding of bash command.
+    :type output_encoding: string
+    """
+
+    template_fields = ('bash_command', 'env')
+
+    @apply_defaults
+    def __init__(self,
+                 bash_command,
+                 env=None,
+                 output_encoding='utf-8',
+                 *args, **kwargs):
+        super(BashSensor, self).__init__(*args, **kwargs)
+        self.bash_command = bash_command
+        self.env = env
+        self.output_encoding = output_encoding
+
+    def poke(self, context):
+        """
+        Execute the bash command in a temporary directory
+        which will be cleaned afterwards
+        """
+        bash_command = self.bash_command
+        self.log.info("Tmp dir root location: \n %s", gettempdir())
+        with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
+            with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:
+
+                f.write(bytes(bash_command, 'utf_8'))
+                f.flush()
+                fname = f.name
+                script_location = tmp_dir + "/" + fname
+                self.log.info(
+                    "Temporary script location: %s",
+                    script_location
+                )
+                self.log.info("Running command: %s", bash_command)
+                sp = Popen(
+                    ['bash', fname],
+                    stdout=PIPE, stderr=STDOUT,
+                    close_fds=True, cwd=tmp_dir,
+                    env=self.env, preexec_fn=os.setsid)
+
+                self.sp = sp
+
+                self.log.info("Output:")
+                line = ''
+                for line in iter(sp.stdout.readline, b''):
+                    line = line.decode(self.output_encoding).strip()
+                    self.log.info(line)
+                sp.wait()
+                self.log.info("Command exited with "
+                              "return code {0}".format(sp.returncode))
+
+                return not sp.returncode

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b7c2f716/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index 6cc7f7c..e7b7a51 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -110,6 +110,7 @@ Community-contributed Operators
 .. autoclass:: airflow.contrib.operators.ssh_operator.SSHOperator
 .. autoclass:: airflow.contrib.operators.vertica_operator.VerticaOperator
 .. autoclass:: airflow.contrib.operators.vertica_to_hive.VerticaToHiveTransfer
+.. autoclass:: airflow.contrib.sensors.bash_sensor.BashSensor
 
 .. _macros:
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b7c2f716/tests/contrib/sensors/test_bash_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_bash_sensor.py b/tests/contrib/sensors/test_bash_sensor.py
new file mode 100644
index 0000000..66b18e4
--- /dev/null
+++ b/tests/contrib/sensors/test_bash_sensor.py
@@ -0,0 +1,55 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import datetime
+import unittest
+
+from airflow import DAG, configuration
+from airflow.contrib.sensors.bash_sensor import BashSensor
+from airflow.exceptions import AirflowSensorTimeout
+
+
+class TestBashSensor(unittest.TestCase):
+    def setUp(self):
+        configuration.load_test_config()
+        args = {
+            'owner': 'airflow',
+            'start_date': datetime.datetime(2017, 1, 1)
+        }
+        dag = DAG('test_dag_id', default_args=args)
+        self.dag = dag
+
+    def test_true_condition(self):
+        t = BashSensor(
+            task_id='test_true_condition',
+            bash_command='freturn() { return "$1"; }; freturn 0',
+            output_encoding='utf-8',
+            poke_interval=1,
+            timeout=2,
+            dag=self.dag
+        )
+        t.execute(None)
+
+    def test_false_condition(self):
+        t = BashSensor(
+            task_id='test_false_condition',
+            bash_command='freturn() { return "$1"; }; freturn 1',
+            output_encoding='utf-8',
+            poke_interval=1,
+            timeout=2,
+            dag=self.dag
+        )
+        with self.assertRaises(AirflowSensorTimeout):
+            t.execute(None)