You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2017/12/29 18:21:08 UTC
incubator-airflow git commit: [AIRFLOW-1470] Implement BashSensor
operator
Repository: incubator-airflow
Updated Branches:
refs/heads/master d44a82e59 -> b7c2f7169
[AIRFLOW-1470] Implement BashSensor operator
This sensor succeeds once a bash command/script
returns 0, and keeps poking otherwise. The
implementation is very similar to BashOperator.
Closes #2489 from diogoalexandrefranco/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b7c2f716
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b7c2f716
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b7c2f716
Branch: refs/heads/master
Commit: b7c2f7169b44135349d42b0ddcd6805f7f9c4d6a
Parents: d44a82e
Author: Diogo Franco <di...@farfetch.com>
Authored: Fri Dec 29 19:21:01 2017 +0100
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Fri Dec 29 19:21:01 2017 +0100
----------------------------------------------------------------------
airflow/contrib/sensors/bash_sensor.py | 91 ++++++++++++++++++++++++++
docs/code.rst | 1 +
tests/contrib/sensors/test_bash_sensor.py | 55 ++++++++++++++++
3 files changed, 147 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b7c2f716/airflow/contrib/sensors/bash_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/bash_sensor.py b/airflow/contrib/sensors/bash_sensor.py
new file mode 100644
index 0000000..760ee2c
--- /dev/null
+++ b/airflow/contrib/sensors/bash_sensor.py
@@ -0,0 +1,91 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from builtins import bytes
+import os
+from subprocess import Popen, STDOUT, PIPE
+from tempfile import gettempdir, NamedTemporaryFile
+from airflow.utils.decorators import apply_defaults
+from airflow.operators.sensors import BaseSensorOperator
+from airflow.utils.file import TemporaryDirectory
+
+
+class BashSensor(BaseSensorOperator):
+ """
+ Executes a bash command/script and returns True if and only if the
+ return code is 0.
+
+ :param bash_command: The command, set of commands or reference to a
+ bash script (must be '.sh') to be executed.
+ :type bash_command: string
+
+ :param env: If env is not None, it must be a mapping that defines the
+ environment variables for the new process; these are used instead
+ of inheriting the current process environment, which is the default
+ behavior. (templated)
+ :type env: dict
+ :param output_encoding: output encoding of bash command.
+ :type output_encoding: string
+ """
+
+ template_fields = ('bash_command', 'env')
+
+ @apply_defaults
+ def __init__(self,
+ bash_command,
+ env=None,
+ output_encoding='utf-8',
+ *args, **kwargs):
+ super(BashSensor, self).__init__(*args, **kwargs)
+ self.bash_command = bash_command
+ self.env = env
+ self.output_encoding = output_encoding
+
+ def poke(self, context):
+ """
+ Execute the bash command in a temporary directory
+ which will be cleaned afterwards
+ """
+ bash_command = self.bash_command
+ self.log.info("Tmp dir root location: \n %s", gettempdir())
+ with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
+ with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:
+
+ f.write(bytes(bash_command, 'utf_8'))
+ f.flush()
+ fname = f.name
+ script_location = tmp_dir + "/" + fname
+ self.log.info(
+ "Temporary script location: %s",
+ script_location
+ )
+ self.log.info("Running command: %s", bash_command)
+ sp = Popen(
+ ['bash', fname],
+ stdout=PIPE, stderr=STDOUT,
+ close_fds=True, cwd=tmp_dir,
+ env=self.env, preexec_fn=os.setsid)
+
+ self.sp = sp
+
+ self.log.info("Output:")
+ line = ''
+ for line in iter(sp.stdout.readline, b''):
+ line = line.decode(self.output_encoding).strip()
+ self.log.info(line)
+ sp.wait()
+ self.log.info("Command exited with "
+ "return code {0}".format(sp.returncode))
+
+ return not sp.returncode
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b7c2f716/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index 6cc7f7c..e7b7a51 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -110,6 +110,7 @@ Community-contributed Operators
.. autoclass:: airflow.contrib.operators.ssh_operator.SSHOperator
.. autoclass:: airflow.contrib.operators.vertica_operator.VerticaOperator
.. autoclass:: airflow.contrib.operators.vertica_to_hive.VerticaToHiveTransfer
+.. autoclass:: airflow.contrib.sensors.bash_sensor.BashSensor
.. _macros:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b7c2f716/tests/contrib/sensors/test_bash_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_bash_sensor.py b/tests/contrib/sensors/test_bash_sensor.py
new file mode 100644
index 0000000..66b18e4
--- /dev/null
+++ b/tests/contrib/sensors/test_bash_sensor.py
@@ -0,0 +1,55 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import datetime
+import unittest
+
+from airflow import DAG, configuration
+from airflow.contrib.sensors.bash_sensor import BashSensor
+from airflow.exceptions import AirflowSensorTimeout
+
+
+class TestBashSensor(unittest.TestCase):
+ def setUp(self):
+ configuration.load_test_config()
+ args = {
+ 'owner': 'airflow',
+ 'start_date': datetime.datetime(2017, 1, 1)
+ }
+ dag = DAG('test_dag_id', default_args=args)
+ self.dag = dag
+
+ def test_true_condition(self):
+ t = BashSensor(
+ task_id='test_true_condition',
+ bash_command='freturn() { return "$1"; }; freturn 0',
+ output_encoding='utf-8',
+ poke_interval=1,
+ timeout=2,
+ dag=self.dag
+ )
+ t.execute(None)
+
+ def test_false_condition(self):
+ t = BashSensor(
+ task_id='test_false_condition',
+ bash_command='freturn() { return "$1"; }; freturn 1',
+ output_encoding='utf-8',
+ poke_interval=1,
+ timeout=2,
+ dag=self.dag
+ )
+ with self.assertRaises(AirflowSensorTimeout):
+ t.execute(None)