You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/03/19 18:49:56 UTC

incubator-airflow git commit: [AIRFLOW-2211] Rename hdfs_sensors.py to hdfs_sensor.py for consistency

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 37072ab52 -> e83b01551


[AIRFLOW-2211] Rename hdfs_sensors.py to hdfs_sensor.py for consistency

Closes #3127 from feng-tao/airflow-2211


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e83b0155
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e83b0155
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e83b0155

Branch: refs/heads/master
Commit: e83b0155164e46f29868c9813cda430cd20472d1
Parents: 37072ab
Author: Tao feng <tf...@lyft.com>
Authored: Mon Mar 19 19:49:49 2018 +0100
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Mon Mar 19 19:49:49 2018 +0100

----------------------------------------------------------------------
 airflow/contrib/sensors/hdfs_sensor.py     |  71 +++++++
 airflow/contrib/sensors/hdfs_sensors.py    |  64 ------
 tests/contrib/sensors/test_hdfs_sensor.py  | 248 ++++++++++++++++++++++++
 tests/contrib/sensors/test_hdfs_sensors.py | 248 ------------------------
 4 files changed, 319 insertions(+), 312 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e83b0155/airflow/contrib/sensors/hdfs_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/hdfs_sensor.py b/airflow/contrib/sensors/hdfs_sensor.py
new file mode 100644
index 0000000..1ff395f
--- /dev/null
+++ b/airflow/contrib/sensors/hdfs_sensor.py
@@ -0,0 +1,71 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from airflow.sensors.hdfs_sensor import HdfsSensor
+
+
+class HdfsSensorRegex(HdfsSensor):
+    def __init__(self,
+                 regex,
+                 *args,
+                 **kwargs):
+        super(HdfsSensorRegex, self).__init__(*args, **kwargs)
+        self.regex = regex
+
+    def poke(self, context):
+        """
+        poke matching files in a directory with self.regex
+        :return: Bool depending on the search criteria
+        """
+        sb = self.hook(self.hdfs_conn_id).get_conn()
+        self.log.info(
+            'Poking for {self.filepath} to be a directory '
+            'with files matching {self.regex.pattern}'.
+            format(**locals())
+        )
+        result = [f for f in sb.ls([self.filepath], include_toplevel=False) if
+                  f['file_type'] == 'f' and
+                  self.regex.match(f['path'].replace('%s/' % self.filepath, ''))]
+        result = self.filter_for_ignored_ext(result, self.ignored_ext,
+                                             self.ignore_copying)
+        result = self.filter_for_filesize(result, self.file_size)
+        return bool(result)
+
+
+class HdfsSensorFolder(HdfsSensor):
+    def __init__(self,
+                 be_empty=False,
+                 *args,
+                 **kwargs):
+        super(HdfsSensorFolder, self).__init__(*args, **kwargs)
+        self.be_empty = be_empty
+
+    def poke(self, context):
+        """
+        poke for a non empty directory
+        :return: Bool depending on the search criteria
+        """
+        sb = self.hook(self.hdfs_conn_id).get_conn()
+        result = [f for f in sb.ls([self.filepath], include_toplevel=True)]
+        result = self.filter_for_ignored_ext(result, self.ignored_ext,
+                                             self.ignore_copying)
+        result = self.filter_for_filesize(result, self.file_size)
+        if self.be_empty:
+            self.log.info('Poking for filepath {self.filepath} to a empty directory'
+                          .format(**locals()))
+            return len(result) == 1 and result[0]['path'] == self.filepath
+        else:
+            self.log.info('Poking for filepath {self.filepath} to a non empty directory'
+                          .format(**locals()))
+            result.pop(0)
+            return bool(result) and result[0]['file_type'] == 'f'

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e83b0155/airflow/contrib/sensors/hdfs_sensors.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/hdfs_sensors.py b/airflow/contrib/sensors/hdfs_sensors.py
deleted file mode 100644
index c96005a..0000000
--- a/airflow/contrib/sensors/hdfs_sensors.py
+++ /dev/null
@@ -1,64 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from airflow.sensors.hdfs_sensor import HdfsSensor
-
-
-class HdfsSensorRegex(HdfsSensor):
-    def __init__(self,
-                 regex,
-                 *args,
-                 **kwargs):
-        super(HdfsSensorRegex, self).__init__(*args, **kwargs)
-        self.regex = regex
-
-    def poke(self, context):
-        """
-        poke matching files in a directory with self.regex
-        :return: Bool depending on the search criteria
-        """
-        sb = self.hook(self.hdfs_conn_id).get_conn()
-        self.log.info(
-            'Poking for {self.filepath} to be a directory with files matching {self.regex.pattern}'.format(**locals())
-        )
-        result = [f for f in sb.ls([self.filepath], include_toplevel=False) if
-                  f['file_type'] == 'f' and self.regex.match(f['path'].replace('%s/' % self.filepath, ''))]
-        result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying)
-        result = self.filter_for_filesize(result, self.file_size)
-        return bool(result)
-
-
-class HdfsSensorFolder(HdfsSensor):
-    def __init__(self,
-                 be_empty=False,
-                 *args,
-                 **kwargs):
-        super(HdfsSensorFolder, self).__init__(*args, **kwargs)
-        self.be_empty = be_empty
-
-    def poke(self, context):
-        """
-        poke for a non empty directory
-        :return: Bool depending on the search criteria
-        """
-        sb = self.hook(self.hdfs_conn_id).get_conn()
-        result = [f for f in sb.ls([self.filepath], include_toplevel=True)]
-        result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying)
-        result = self.filter_for_filesize(result, self.file_size)
-        if self.be_empty:
-            self.log.info('Poking for filepath {self.filepath} to a empty directory'.format(**locals()))
-            return len(result) == 1 and result[0]['path'] == self.filepath
-        else:
-            self.log.info('Poking for filepath {self.filepath} to a non empty directory'.format(**locals()))
-            result.pop(0)
-            return bool(result) and result[0]['file_type'] == 'f'

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e83b0155/tests/contrib/sensors/test_hdfs_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_hdfs_sensor.py b/tests/contrib/sensors/test_hdfs_sensor.py
new file mode 100644
index 0000000..cd3c7a3
--- /dev/null
+++ b/tests/contrib/sensors/test_hdfs_sensor.py
@@ -0,0 +1,248 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import unittest
+
+import re
+from datetime import timedelta
+
+from airflow.contrib.sensors.hdfs_sensor import HdfsSensorFolder, HdfsSensorRegex
+from airflow.exceptions import AirflowSensorTimeout
+
+
+class HdfsSensorFolderTests(unittest.TestCase):
+    def setUp(self):
+        from tests.core import FakeHDFSHook
+        self.hook = FakeHDFSHook
+        self.log = logging.getLogger()
+        self.log.setLevel(logging.DEBUG)
+
+    def test_should_be_empty_directory(self):
+        """
+        test the empty directory behaviour
+        :return:
+        """
+        # Given
+        self.log.debug('#' * 10)
+        self.log.debug('Running %s', self._testMethodName)
+        self.log.debug('#' * 10)
+        task = HdfsSensorFolder(task_id='Should_be_empty_directory',
+                                filepath='/datadirectory/empty_directory',
+                                be_empty=True,
+                                timeout=1,
+                                retry_delay=timedelta(seconds=1),
+                                poke_interval=1,
+                                hook=self.hook)
+
+        # When
+        task.execute(None)
+
+        # Then
+        # Nothing happens, nothing is raised exec is ok
+
+    def test_should_be_empty_directory_fail(self):
+        """
+        test the empty directory behaviour
+        :return:
+        """
+        # Given
+        self.log.debug('#' * 10)
+        self.log.debug('Running %s', self._testMethodName)
+        self.log.debug('#' * 10)
+        task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail',
+                                filepath='/datadirectory/not_empty_directory',
+                                be_empty=True,
+                                timeout=1,
+                                retry_delay=timedelta(seconds=1),
+                                poke_interval=1,
+                                hook=self.hook)
+
+        # When
+        # Then
+        with self.assertRaises(AirflowSensorTimeout):
+            task.execute(None)
+
+    def test_should_be_a_non_empty_directory(self):
+        """
+        test the empty directory behaviour
+        :return:
+        """
+        # Given
+        self.log.debug('#' * 10)
+        self.log.debug('Running %s', self._testMethodName)
+        self.log.debug('#' * 10)
+        task = HdfsSensorFolder(task_id='Should_be_non_empty_directory',
+                                filepath='/datadirectory/not_empty_directory',
+                                timeout=1,
+                                retry_delay=timedelta(seconds=1),
+                                poke_interval=1,
+                                hook=self.hook)
+
+        # When
+        task.execute(None)
+
+        # Then
+        # Nothing happens, nothing is raised exec is ok
+
+    def test_should_be_non_empty_directory_fail(self):
+        """
+        test the empty directory behaviour
+        :return:
+        """
+        # Given
+        self.log.debug('#' * 10)
+        self.log.debug('Running %s', self._testMethodName)
+        self.log.debug('#' * 10)
+        task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail',
+                                filepath='/datadirectory/empty_directory',
+                                timeout=1,
+                                retry_delay=timedelta(seconds=1),
+                                poke_interval=1,
+                                hook=self.hook)
+
+        # When
+        # Then
+        with self.assertRaises(AirflowSensorTimeout):
+            task.execute(None)
+
+
+class HdfsSensorRegexTests(unittest.TestCase):
+    def setUp(self):
+        from tests.core import FakeHDFSHook
+        self.hook = FakeHDFSHook
+        self.log = logging.getLogger()
+        self.log.setLevel(logging.DEBUG)
+
+    def test_should_match_regex(self):
+        """
+        test the empty directory behaviour
+        :return:
+        """
+        # Given
+        self.log.debug('#' * 10)
+        self.log.debug('Running %s', self._testMethodName)
+        self.log.debug('#' * 10)
+        compiled_regex = re.compile("test[1-2]file")
+        task = HdfsSensorRegex(task_id='Should_match_the_regex',
+                               filepath='/datadirectory/regex_dir',
+                               regex=compiled_regex,
+                               timeout=1,
+                               retry_delay=timedelta(seconds=1),
+                               poke_interval=1,
+                               hook=self.hook)
+
+        # When
+        task.execute(None)
+
+        # Then
+        # Nothing happens, nothing is raised exec is ok
+
+    def test_should_not_match_regex(self):
+        """
+        test the empty directory behaviour
+        :return:
+        """
+        # Given
+        self.log.debug('#' * 10)
+        self.log.debug('Running %s', self._testMethodName)
+        self.log.debug('#' * 10)
+        compiled_regex = re.compile("^IDoNotExist")
+        task = HdfsSensorRegex(task_id='Should_not_match_the_regex',
+                               filepath='/datadirectory/regex_dir',
+                               regex=compiled_regex,
+                               timeout=1,
+                               retry_delay=timedelta(seconds=1),
+                               poke_interval=1,
+                               hook=self.hook)
+
+        # When
+        # Then
+        with self.assertRaises(AirflowSensorTimeout):
+            task.execute(None)
+
+    def test_should_match_regex_and_filesize(self):
+        """
+        test the file size behaviour with regex
+        :return:
+        """
+        # Given
+        self.log.debug('#' * 10)
+        self.log.debug('Running %s', self._testMethodName)
+        self.log.debug('#' * 10)
+        compiled_regex = re.compile("test[1-2]file")
+        task = HdfsSensorRegex(task_id='Should_match_the_regex_and_filesize',
+                               filepath='/datadirectory/regex_dir',
+                               regex=compiled_regex,
+                               ignore_copying=True,
+                               ignored_ext=['_COPYING_', 'sftp'],
+                               file_size=10,
+                               timeout=1,
+                               retry_delay=timedelta(seconds=1),
+                               poke_interval=1,
+                               hook=self.hook)
+
+        # When
+        task.execute(None)
+
+        # Then
+        # Nothing happens, nothing is raised exec is ok
+
+    def test_should_match_regex_but_filesize(self):
+        """
+        test the file size behaviour with regex
+        :return:
+        """
+        # Given
+        self.log.debug('#' * 10)
+        self.log.debug('Running %s', self._testMethodName)
+        self.log.debug('#' * 10)
+        compiled_regex = re.compile("test[1-2]file")
+        task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize',
+                               filepath='/datadirectory/regex_dir',
+                               regex=compiled_regex,
+                               file_size=20,
+                               timeout=1,
+                               retry_delay=timedelta(seconds=1),
+                               poke_interval=1,
+                               hook=self.hook)
+
+        # When
+        # Then
+        with self.assertRaises(AirflowSensorTimeout):
+            task.execute(None)
+
+    def test_should_match_regex_but_copyingext(self):
+        """
+        test the file size behaviour with regex
+        :return:
+        """
+        # Given
+        self.log.debug('#' * 10)
+        self.log.debug('Running %s', self._testMethodName)
+        self.log.debug('#' * 10)
+        compiled_regex = re.compile("copying_file_\d+.txt")
+        task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize',
+                               filepath='/datadirectory/regex_dir',
+                               regex=compiled_regex,
+                               ignored_ext=['_COPYING_', 'sftp'],
+                               file_size=20,
+                               timeout=1,
+                               retry_delay=timedelta(seconds=1),
+                               poke_interval=1,
+                               hook=self.hook)
+
+        # When
+        # Then
+        with self.assertRaises(AirflowSensorTimeout):
+            task.execute(None)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e83b0155/tests/contrib/sensors/test_hdfs_sensors.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_hdfs_sensors.py b/tests/contrib/sensors/test_hdfs_sensors.py
deleted file mode 100644
index cd191f8..0000000
--- a/tests/contrib/sensors/test_hdfs_sensors.py
+++ /dev/null
@@ -1,248 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import logging
-import unittest
-
-import re
-from datetime import timedelta
-
-from airflow.contrib.sensors.hdfs_sensors import HdfsSensorFolder, HdfsSensorRegex
-from airflow.exceptions import AirflowSensorTimeout
-
-
-class HdfsSensorFolderTests(unittest.TestCase):
-    def setUp(self):
-        from tests.core import FakeHDFSHook
-        self.hook = FakeHDFSHook
-        self.log = logging.getLogger()
-        self.log.setLevel(logging.DEBUG)
-
-    def test_should_be_empty_directory(self):
-        """
-        test the empty directory behaviour
-        :return:
-        """
-        # Given
-        self.log.debug('#' * 10)
-        self.log.debug('Running %s', self._testMethodName)
-        self.log.debug('#' * 10)
-        task = HdfsSensorFolder(task_id='Should_be_empty_directory',
-                                filepath='/datadirectory/empty_directory',
-                                be_empty=True,
-                                timeout=1,
-                                retry_delay=timedelta(seconds=1),
-                                poke_interval=1,
-                                hook=self.hook)
-
-        # When
-        task.execute(None)
-
-        # Then
-        # Nothing happens, nothing is raised exec is ok
-
-    def test_should_be_empty_directory_fail(self):
-        """
-        test the empty directory behaviour
-        :return:
-        """
-        # Given
-        self.log.debug('#' * 10)
-        self.log.debug('Running %s', self._testMethodName)
-        self.log.debug('#' * 10)
-        task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail',
-                                filepath='/datadirectory/not_empty_directory',
-                                be_empty=True,
-                                timeout=1,
-                                retry_delay=timedelta(seconds=1),
-                                poke_interval=1,
-                                hook=self.hook)
-
-        # When
-        # Then
-        with self.assertRaises(AirflowSensorTimeout):
-            task.execute(None)
-
-    def test_should_be_a_non_empty_directory(self):
-        """
-        test the empty directory behaviour
-        :return:
-        """
-        # Given
-        self.log.debug('#' * 10)
-        self.log.debug('Running %s', self._testMethodName)
-        self.log.debug('#' * 10)
-        task = HdfsSensorFolder(task_id='Should_be_non_empty_directory',
-                                filepath='/datadirectory/not_empty_directory',
-                                timeout=1,
-                                retry_delay=timedelta(seconds=1),
-                                poke_interval=1,
-                                hook=self.hook)
-
-        # When
-        task.execute(None)
-
-        # Then
-        # Nothing happens, nothing is raised exec is ok
-
-    def test_should_be_non_empty_directory_fail(self):
-        """
-        test the empty directory behaviour
-        :return:
-        """
-        # Given
-        self.log.debug('#' * 10)
-        self.log.debug('Running %s', self._testMethodName)
-        self.log.debug('#' * 10)
-        task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail',
-                                filepath='/datadirectory/empty_directory',
-                                timeout=1,
-                                retry_delay=timedelta(seconds=1),
-                                poke_interval=1,
-                                hook=self.hook)
-
-        # When
-        # Then
-        with self.assertRaises(AirflowSensorTimeout):
-            task.execute(None)
-
-
-class HdfsSensorRegexTests(unittest.TestCase):
-    def setUp(self):
-        from tests.core import FakeHDFSHook
-        self.hook = FakeHDFSHook
-        self.log = logging.getLogger()
-        self.log.setLevel(logging.DEBUG)
-
-    def test_should_match_regex(self):
-        """
-        test the empty directory behaviour
-        :return:
-        """
-        # Given
-        self.log.debug('#' * 10)
-        self.log.debug('Running %s', self._testMethodName)
-        self.log.debug('#' * 10)
-        compiled_regex = re.compile("test[1-2]file")
-        task = HdfsSensorRegex(task_id='Should_match_the_regex',
-                               filepath='/datadirectory/regex_dir',
-                               regex=compiled_regex,
-                               timeout=1,
-                               retry_delay=timedelta(seconds=1),
-                               poke_interval=1,
-                               hook=self.hook)
-
-        # When
-        task.execute(None)
-
-        # Then
-        # Nothing happens, nothing is raised exec is ok
-
-    def test_should_not_match_regex(self):
-        """
-        test the empty directory behaviour
-        :return:
-        """
-        # Given
-        self.log.debug('#' * 10)
-        self.log.debug('Running %s', self._testMethodName)
-        self.log.debug('#' * 10)
-        compiled_regex = re.compile("^IDoNotExist")
-        task = HdfsSensorRegex(task_id='Should_not_match_the_regex',
-                               filepath='/datadirectory/regex_dir',
-                               regex=compiled_regex,
-                               timeout=1,
-                               retry_delay=timedelta(seconds=1),
-                               poke_interval=1,
-                               hook=self.hook)
-
-        # When
-        # Then
-        with self.assertRaises(AirflowSensorTimeout):
-            task.execute(None)
-
-    def test_should_match_regex_and_filesize(self):
-        """
-        test the file size behaviour with regex
-        :return:
-        """
-        # Given
-        self.log.debug('#' * 10)
-        self.log.debug('Running %s', self._testMethodName)
-        self.log.debug('#' * 10)
-        compiled_regex = re.compile("test[1-2]file")
-        task = HdfsSensorRegex(task_id='Should_match_the_regex_and_filesize',
-                               filepath='/datadirectory/regex_dir',
-                               regex=compiled_regex,
-                               ignore_copying=True,
-                               ignored_ext=['_COPYING_', 'sftp'],
-                               file_size=10,
-                               timeout=1,
-                               retry_delay=timedelta(seconds=1),
-                               poke_interval=1,
-                               hook=self.hook)
-
-        # When
-        task.execute(None)
-
-        # Then
-        # Nothing happens, nothing is raised exec is ok
-
-    def test_should_match_regex_but_filesize(self):
-        """
-        test the file size behaviour with regex
-        :return:
-        """
-        # Given
-        self.log.debug('#' * 10)
-        self.log.debug('Running %s', self._testMethodName)
-        self.log.debug('#' * 10)
-        compiled_regex = re.compile("test[1-2]file")
-        task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize',
-                               filepath='/datadirectory/regex_dir',
-                               regex=compiled_regex,
-                               file_size=20,
-                               timeout=1,
-                               retry_delay=timedelta(seconds=1),
-                               poke_interval=1,
-                               hook=self.hook)
-
-        # When
-        # Then
-        with self.assertRaises(AirflowSensorTimeout):
-            task.execute(None)
-
-    def test_should_match_regex_but_copyingext(self):
-        """
-        test the file size behaviour with regex
-        :return:
-        """
-        # Given
-        self.log.debug('#' * 10)
-        self.log.debug('Running %s', self._testMethodName)
-        self.log.debug('#' * 10)
-        compiled_regex = re.compile("copying_file_\d+.txt")
-        task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize',
-                               filepath='/datadirectory/regex_dir',
-                               regex=compiled_regex,
-                               ignored_ext=['_COPYING_', 'sftp'],
-                               file_size=20,
-                               timeout=1,
-                               retry_delay=timedelta(seconds=1),
-                               poke_interval=1,
-                               hook=self.hook)
-
-        # When
-        # Then
-        with self.assertRaises(AirflowSensorTimeout):
-            task.execute(None)