You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/02/19 08:52:31 UTC

incubator-airflow git commit: [AIRFLOW-866] Add FTPSensor

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 0ed608dca -> 5f87f8a68


[AIRFLOW-866] Add FTPSensor

Closes #2070 from s7anley/ftp-sensor


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5f87f8a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5f87f8a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5f87f8a6

Branch: refs/heads/master
Commit: 5f87f8a686550ade1c13f133a97523e8ebc3619f
Parents: 0ed608d
Author: J�n Ko\u0161\u010do <3k...@gmail.com>
Authored: Sun Feb 19 09:52:16 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sun Feb 19 09:52:19 2017 +0100

----------------------------------------------------------------------
 airflow/contrib/sensors/ftp_sensor.py | 64 +++++++++++++++++++++++++++++
 tests/contrib/sensors/ftp_sensor.py   | 66 ++++++++++++++++++++++++++++++
 2 files changed, 130 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5f87f8a6/airflow/contrib/sensors/ftp_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/ftp_sensor.py b/airflow/contrib/sensors/ftp_sensor.py
new file mode 100644
index 0000000..4a9428b
--- /dev/null
+++ b/airflow/contrib/sensors/ftp_sensor.py
@@ -0,0 +1,64 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import ftplib
+import logging
+
+from airflow.contrib.hooks.ftp_hook import FTPHook, FTPSHook
+from airflow.operators.sensors import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class FTPSensor(BaseSensorOperator):
+    """
+    Waits for a file or directory to be present on FTP.
+
+    :param path: Remote file or directory path
+    :type path: str
+    :param ftp_conn_id: The connection to run the sensor against
+    :type ftp_conn_id: str
+    """
+    template_fields = ('path',)
+
+    @apply_defaults
+    def __init__(self, path, ftp_conn_id='ftp_default', *args, **kwargs):
+        super(FTPSensor, self).__init__(*args, **kwargs)
+
+        self.path = path
+        self.ftp_conn_id = ftp_conn_id
+
+    def _create_hook(self):
+        """Return connection hook."""
+        return FTPHook(ftp_conn_id=self.ftp_conn_id)
+
+    def poke(self, context):
+        with self._create_hook() as hook:
+            logging.info('Poking for %s', self.path)
+            try:
+                hook.get_mod_time(self.path)
+            except ftplib.error_perm as e:
+                error = str(e).split(None, 1)
+                if error[1] != "Can't check for file existence":
+                    raise e
+
+                return False
+
+            return True
+
+
+class FTPSSensor(FTPSensor):
+    """Waits for a file or directory to be present on FTP over SSL."""
+    def _create_hook(self):
+        """Return connection hook."""
+        return FTPSHook(ftp_conn_id=self.ftp_conn_id)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5f87f8a6/tests/contrib/sensors/ftp_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/ftp_sensor.py b/tests/contrib/sensors/ftp_sensor.py
new file mode 100644
index 0000000..50f8b8b
--- /dev/null
+++ b/tests/contrib/sensors/ftp_sensor.py
@@ -0,0 +1,66 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from ftplib import error_perm
+
+from mock import MagicMock
+
+from airflow.contrib.hooks.ftp_hook import FTPHook
+from airflow.contrib.sensors.ftp_sensor import FTPSensor
+
+
+class TestFTPSensor(unittest.TestCase):
+    def setUp(self):
+        super(TestFTPSensor, self).setUp()
+        self._create_hook_orig = FTPSensor._create_hook
+        self.hook_mock = MagicMock(spec=FTPHook)
+
+        def _create_hook_mock(sensor):
+            mock = MagicMock()
+            mock.__enter__ = lambda x: self.hook_mock
+
+            return mock
+
+        FTPSensor._create_hook = _create_hook_mock
+
+    def tearDown(self):
+        FTPSensor._create_hook = self._create_hook_orig
+        super(TestFTPSensor, self).tearDown()
+
+    def test_poke(self):
+        op = FTPSensor(path="foobar.json", ftp_conn_id="bob_ftp",
+                       task_id="test_task")
+
+        self.hook_mock.get_mod_time.side_effect = \
+            [error_perm("550: Can't check for file existence"), None]
+
+        self.assertFalse(op.poke(None))
+        self.assertTrue(op.poke(None))
+
+    def test_poke_fails_due_error(self):
+        op = FTPSensor(path="foobar.json", ftp_conn_id="bob_ftp",
+                       task_id="test_task")
+
+        self.hook_mock.get_mod_time.side_effect = \
+            error_perm("530: Login authentication failed")
+
+        with self.assertRaises(error_perm) as context:
+            op.execute(None)
+
+        self.assertTrue("530" in str(context.exception))
+
+
+if __name__ == '__main__':
+    unittest.main()