You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/02/19 08:52:31 UTC
incubator-airflow git commit: [AIRFLOW-866] Add FTPSensor
Repository: incubator-airflow
Updated Branches:
refs/heads/master 0ed608dca -> 5f87f8a68
[AIRFLOW-866] Add FTPSensor
Closes #2070 from s7anley/ftp-sensor
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5f87f8a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5f87f8a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5f87f8a6
Branch: refs/heads/master
Commit: 5f87f8a686550ade1c13f133a97523e8ebc3619f
Parents: 0ed608d
Author: J�n Ko\u0161\u010do <3k...@gmail.com>
Authored: Sun Feb 19 09:52:16 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sun Feb 19 09:52:19 2017 +0100
----------------------------------------------------------------------
airflow/contrib/sensors/ftp_sensor.py | 64 +++++++++++++++++++++++++++++
tests/contrib/sensors/ftp_sensor.py | 66 ++++++++++++++++++++++++++++++
2 files changed, 130 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5f87f8a6/airflow/contrib/sensors/ftp_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/ftp_sensor.py b/airflow/contrib/sensors/ftp_sensor.py
new file mode 100644
index 0000000..4a9428b
--- /dev/null
+++ b/airflow/contrib/sensors/ftp_sensor.py
@@ -0,0 +1,64 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import ftplib
+import logging
+
+from airflow.contrib.hooks.ftp_hook import FTPHook, FTPSHook
+from airflow.operators.sensors import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class FTPSensor(BaseSensorOperator):
+ """
+ Waits for a file or directory to be present on FTP.
+
+ :param path: Remote file or directory path
+ :type path: str
+ :param ftp_conn_id: The connection to run the sensor against
+ :type ftp_conn_id: str
+ """
+ template_fields = ('path',)
+
+ @apply_defaults
+ def __init__(self, path, ftp_conn_id='ftp_default', *args, **kwargs):
+ super(FTPSensor, self).__init__(*args, **kwargs)
+
+ self.path = path
+ self.ftp_conn_id = ftp_conn_id
+
+ def _create_hook(self):
+ """Return connection hook."""
+ return FTPHook(ftp_conn_id=self.ftp_conn_id)
+
+ def poke(self, context):
+ with self._create_hook() as hook:
+ logging.info('Poking for %s', self.path)
+ try:
+ hook.get_mod_time(self.path)
+ except ftplib.error_perm as e:
+ error = str(e).split(None, 1)
+ if error[1] != "Can't check for file existence":
+ raise e
+
+ return False
+
+ return True
+
+
+class FTPSSensor(FTPSensor):
+ """Waits for a file or directory to be present on FTP over SSL."""
+ def _create_hook(self):
+ """Return connection hook."""
+ return FTPSHook(ftp_conn_id=self.ftp_conn_id)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5f87f8a6/tests/contrib/sensors/ftp_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/ftp_sensor.py b/tests/contrib/sensors/ftp_sensor.py
new file mode 100644
index 0000000..50f8b8b
--- /dev/null
+++ b/tests/contrib/sensors/ftp_sensor.py
@@ -0,0 +1,66 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from ftplib import error_perm
+
+from mock import MagicMock
+
+from airflow.contrib.hooks.ftp_hook import FTPHook
+from airflow.contrib.sensors.ftp_sensor import FTPSensor
+
+
+class TestFTPSensor(unittest.TestCase):
+ def setUp(self):
+ super(TestFTPSensor, self).setUp()
+ self._create_hook_orig = FTPSensor._create_hook
+ self.hook_mock = MagicMock(spec=FTPHook)
+
+ def _create_hook_mock(sensor):
+ mock = MagicMock()
+ mock.__enter__ = lambda x: self.hook_mock
+
+ return mock
+
+ FTPSensor._create_hook = _create_hook_mock
+
+ def tearDown(self):
+ FTPSensor._create_hook = self._create_hook_orig
+ super(TestFTPSensor, self).tearDown()
+
+ def test_poke(self):
+ op = FTPSensor(path="foobar.json", ftp_conn_id="bob_ftp",
+ task_id="test_task")
+
+ self.hook_mock.get_mod_time.side_effect = \
+ [error_perm("550: Can't check for file existence"), None]
+
+ self.assertFalse(op.poke(None))
+ self.assertTrue(op.poke(None))
+
+ def test_poke_fails_due_error(self):
+ op = FTPSensor(path="foobar.json", ftp_conn_id="bob_ftp",
+ task_id="test_task")
+
+ self.hook_mock.get_mod_time.side_effect = \
+ error_perm("530: Login authentication failed")
+
+ with self.assertRaises(error_perm) as context:
+ op.execute(None)
+
+ self.assertTrue("530" in str(context.exception))
+
+
+if __name__ == '__main__':
+ unittest.main()