You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/03/19 18:49:56 UTC
incubator-airflow git commit: [AIRFLOW-2211] Rename hdfs_sensors.py
to hdfs_sensor.py for consistency
Repository: incubator-airflow
Updated Branches:
refs/heads/master 37072ab52 -> e83b01551
[AIRFLOW-2211] Rename hdfs_sensors.py to hdfs_sensor.py for consistency
Closes #3127 from feng-tao/airflow-2211
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e83b0155
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e83b0155
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e83b0155
Branch: refs/heads/master
Commit: e83b0155164e46f29868c9813cda430cd20472d1
Parents: 37072ab
Author: Tao feng <tf...@lyft.com>
Authored: Mon Mar 19 19:49:49 2018 +0100
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Mon Mar 19 19:49:49 2018 +0100
----------------------------------------------------------------------
airflow/contrib/sensors/hdfs_sensor.py | 71 +++++++
airflow/contrib/sensors/hdfs_sensors.py | 64 ------
tests/contrib/sensors/test_hdfs_sensor.py | 248 ++++++++++++++++++++++++
tests/contrib/sensors/test_hdfs_sensors.py | 248 ------------------------
4 files changed, 319 insertions(+), 312 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e83b0155/airflow/contrib/sensors/hdfs_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/hdfs_sensor.py b/airflow/contrib/sensors/hdfs_sensor.py
new file mode 100644
index 0000000..1ff395f
--- /dev/null
+++ b/airflow/contrib/sensors/hdfs_sensor.py
@@ -0,0 +1,71 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from airflow.sensors.hdfs_sensor import HdfsSensor
+
+
+class HdfsSensorRegex(HdfsSensor):
+ def __init__(self,
+ regex,
+ *args,
+ **kwargs):
+ super(HdfsSensorRegex, self).__init__(*args, **kwargs)
+ self.regex = regex
+
+ def poke(self, context):
+ """
+ poke matching files in a directory with self.regex
+ :return: Bool depending on the search criteria
+ """
+ sb = self.hook(self.hdfs_conn_id).get_conn()
+ self.log.info(
+ 'Poking for {self.filepath} to be a directory '
+ 'with files matching {self.regex.pattern}'.
+ format(**locals())
+ )
+ result = [f for f in sb.ls([self.filepath], include_toplevel=False) if
+ f['file_type'] == 'f' and
+ self.regex.match(f['path'].replace('%s/' % self.filepath, ''))]
+ result = self.filter_for_ignored_ext(result, self.ignored_ext,
+ self.ignore_copying)
+ result = self.filter_for_filesize(result, self.file_size)
+ return bool(result)
+
+
+class HdfsSensorFolder(HdfsSensor):
+ def __init__(self,
+ be_empty=False,
+ *args,
+ **kwargs):
+ super(HdfsSensorFolder, self).__init__(*args, **kwargs)
+ self.be_empty = be_empty
+
+ def poke(self, context):
+ """
+ poke for a non empty directory
+ :return: Bool depending on the search criteria
+ """
+ sb = self.hook(self.hdfs_conn_id).get_conn()
+ result = [f for f in sb.ls([self.filepath], include_toplevel=True)]
+ result = self.filter_for_ignored_ext(result, self.ignored_ext,
+ self.ignore_copying)
+ result = self.filter_for_filesize(result, self.file_size)
+ if self.be_empty:
+ self.log.info('Poking for filepath {self.filepath} to a empty directory'
+ .format(**locals()))
+ return len(result) == 1 and result[0]['path'] == self.filepath
+ else:
+ self.log.info('Poking for filepath {self.filepath} to a non empty directory'
+ .format(**locals()))
+ result.pop(0)
+ return bool(result) and result[0]['file_type'] == 'f'
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e83b0155/airflow/contrib/sensors/hdfs_sensors.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/hdfs_sensors.py b/airflow/contrib/sensors/hdfs_sensors.py
deleted file mode 100644
index c96005a..0000000
--- a/airflow/contrib/sensors/hdfs_sensors.py
+++ /dev/null
@@ -1,64 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from airflow.sensors.hdfs_sensor import HdfsSensor
-
-
-class HdfsSensorRegex(HdfsSensor):
- def __init__(self,
- regex,
- *args,
- **kwargs):
- super(HdfsSensorRegex, self).__init__(*args, **kwargs)
- self.regex = regex
-
- def poke(self, context):
- """
- poke matching files in a directory with self.regex
- :return: Bool depending on the search criteria
- """
- sb = self.hook(self.hdfs_conn_id).get_conn()
- self.log.info(
- 'Poking for {self.filepath} to be a directory with files matching {self.regex.pattern}'.format(**locals())
- )
- result = [f for f in sb.ls([self.filepath], include_toplevel=False) if
- f['file_type'] == 'f' and self.regex.match(f['path'].replace('%s/' % self.filepath, ''))]
- result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying)
- result = self.filter_for_filesize(result, self.file_size)
- return bool(result)
-
-
-class HdfsSensorFolder(HdfsSensor):
- def __init__(self,
- be_empty=False,
- *args,
- **kwargs):
- super(HdfsSensorFolder, self).__init__(*args, **kwargs)
- self.be_empty = be_empty
-
- def poke(self, context):
- """
- poke for a non empty directory
- :return: Bool depending on the search criteria
- """
- sb = self.hook(self.hdfs_conn_id).get_conn()
- result = [f for f in sb.ls([self.filepath], include_toplevel=True)]
- result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying)
- result = self.filter_for_filesize(result, self.file_size)
- if self.be_empty:
- self.log.info('Poking for filepath {self.filepath} to a empty directory'.format(**locals()))
- return len(result) == 1 and result[0]['path'] == self.filepath
- else:
- self.log.info('Poking for filepath {self.filepath} to a non empty directory'.format(**locals()))
- result.pop(0)
- return bool(result) and result[0]['file_type'] == 'f'
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e83b0155/tests/contrib/sensors/test_hdfs_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_hdfs_sensor.py b/tests/contrib/sensors/test_hdfs_sensor.py
new file mode 100644
index 0000000..cd3c7a3
--- /dev/null
+++ b/tests/contrib/sensors/test_hdfs_sensor.py
@@ -0,0 +1,248 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import unittest
+
+import re
+from datetime import timedelta
+
+from airflow.contrib.sensors.hdfs_sensor import HdfsSensorFolder, HdfsSensorRegex
+from airflow.exceptions import AirflowSensorTimeout
+
+
+class HdfsSensorFolderTests(unittest.TestCase):
+ def setUp(self):
+ from tests.core import FakeHDFSHook
+ self.hook = FakeHDFSHook
+ self.log = logging.getLogger()
+ self.log.setLevel(logging.DEBUG)
+
+ def test_should_be_empty_directory(self):
+ """
+ test the empty directory behaviour
+ :return:
+ """
+ # Given
+ self.log.debug('#' * 10)
+ self.log.debug('Running %s', self._testMethodName)
+ self.log.debug('#' * 10)
+ task = HdfsSensorFolder(task_id='Should_be_empty_directory',
+ filepath='/datadirectory/empty_directory',
+ be_empty=True,
+ timeout=1,
+ retry_delay=timedelta(seconds=1),
+ poke_interval=1,
+ hook=self.hook)
+
+ # When
+ task.execute(None)
+
+ # Then
+ # Nothing happens, nothing is raised exec is ok
+
+ def test_should_be_empty_directory_fail(self):
+ """
+ test the empty directory behaviour
+ :return:
+ """
+ # Given
+ self.log.debug('#' * 10)
+ self.log.debug('Running %s', self._testMethodName)
+ self.log.debug('#' * 10)
+ task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail',
+ filepath='/datadirectory/not_empty_directory',
+ be_empty=True,
+ timeout=1,
+ retry_delay=timedelta(seconds=1),
+ poke_interval=1,
+ hook=self.hook)
+
+ # When
+ # Then
+ with self.assertRaises(AirflowSensorTimeout):
+ task.execute(None)
+
+ def test_should_be_a_non_empty_directory(self):
+ """
+ test the empty directory behaviour
+ :return:
+ """
+ # Given
+ self.log.debug('#' * 10)
+ self.log.debug('Running %s', self._testMethodName)
+ self.log.debug('#' * 10)
+ task = HdfsSensorFolder(task_id='Should_be_non_empty_directory',
+ filepath='/datadirectory/not_empty_directory',
+ timeout=1,
+ retry_delay=timedelta(seconds=1),
+ poke_interval=1,
+ hook=self.hook)
+
+ # When
+ task.execute(None)
+
+ # Then
+ # Nothing happens, nothing is raised exec is ok
+
+ def test_should_be_non_empty_directory_fail(self):
+ """
+ test the empty directory behaviour
+ :return:
+ """
+ # Given
+ self.log.debug('#' * 10)
+ self.log.debug('Running %s', self._testMethodName)
+ self.log.debug('#' * 10)
+ task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail',
+ filepath='/datadirectory/empty_directory',
+ timeout=1,
+ retry_delay=timedelta(seconds=1),
+ poke_interval=1,
+ hook=self.hook)
+
+ # When
+ # Then
+ with self.assertRaises(AirflowSensorTimeout):
+ task.execute(None)
+
+
+class HdfsSensorRegexTests(unittest.TestCase):
+ def setUp(self):
+ from tests.core import FakeHDFSHook
+ self.hook = FakeHDFSHook
+ self.log = logging.getLogger()
+ self.log.setLevel(logging.DEBUG)
+
+ def test_should_match_regex(self):
+ """
+ test the empty directory behaviour
+ :return:
+ """
+ # Given
+ self.log.debug('#' * 10)
+ self.log.debug('Running %s', self._testMethodName)
+ self.log.debug('#' * 10)
+ compiled_regex = re.compile("test[1-2]file")
+ task = HdfsSensorRegex(task_id='Should_match_the_regex',
+ filepath='/datadirectory/regex_dir',
+ regex=compiled_regex,
+ timeout=1,
+ retry_delay=timedelta(seconds=1),
+ poke_interval=1,
+ hook=self.hook)
+
+ # When
+ task.execute(None)
+
+ # Then
+ # Nothing happens, nothing is raised exec is ok
+
+ def test_should_not_match_regex(self):
+ """
+ test the empty directory behaviour
+ :return:
+ """
+ # Given
+ self.log.debug('#' * 10)
+ self.log.debug('Running %s', self._testMethodName)
+ self.log.debug('#' * 10)
+ compiled_regex = re.compile("^IDoNotExist")
+ task = HdfsSensorRegex(task_id='Should_not_match_the_regex',
+ filepath='/datadirectory/regex_dir',
+ regex=compiled_regex,
+ timeout=1,
+ retry_delay=timedelta(seconds=1),
+ poke_interval=1,
+ hook=self.hook)
+
+ # When
+ # Then
+ with self.assertRaises(AirflowSensorTimeout):
+ task.execute(None)
+
+ def test_should_match_regex_and_filesize(self):
+ """
+ test the file size behaviour with regex
+ :return:
+ """
+ # Given
+ self.log.debug('#' * 10)
+ self.log.debug('Running %s', self._testMethodName)
+ self.log.debug('#' * 10)
+ compiled_regex = re.compile("test[1-2]file")
+ task = HdfsSensorRegex(task_id='Should_match_the_regex_and_filesize',
+ filepath='/datadirectory/regex_dir',
+ regex=compiled_regex,
+ ignore_copying=True,
+ ignored_ext=['_COPYING_', 'sftp'],
+ file_size=10,
+ timeout=1,
+ retry_delay=timedelta(seconds=1),
+ poke_interval=1,
+ hook=self.hook)
+
+ # When
+ task.execute(None)
+
+ # Then
+ # Nothing happens, nothing is raised exec is ok
+
+ def test_should_match_regex_but_filesize(self):
+ """
+ test the file size behaviour with regex
+ :return:
+ """
+ # Given
+ self.log.debug('#' * 10)
+ self.log.debug('Running %s', self._testMethodName)
+ self.log.debug('#' * 10)
+ compiled_regex = re.compile("test[1-2]file")
+ task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize',
+ filepath='/datadirectory/regex_dir',
+ regex=compiled_regex,
+ file_size=20,
+ timeout=1,
+ retry_delay=timedelta(seconds=1),
+ poke_interval=1,
+ hook=self.hook)
+
+ # When
+ # Then
+ with self.assertRaises(AirflowSensorTimeout):
+ task.execute(None)
+
+ def test_should_match_regex_but_copyingext(self):
+ """
+ test the file size behaviour with regex
+ :return:
+ """
+ # Given
+ self.log.debug('#' * 10)
+ self.log.debug('Running %s', self._testMethodName)
+ self.log.debug('#' * 10)
+ compiled_regex = re.compile("copying_file_\d+.txt")
+ task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize',
+ filepath='/datadirectory/regex_dir',
+ regex=compiled_regex,
+ ignored_ext=['_COPYING_', 'sftp'],
+ file_size=20,
+ timeout=1,
+ retry_delay=timedelta(seconds=1),
+ poke_interval=1,
+ hook=self.hook)
+
+ # When
+ # Then
+ with self.assertRaises(AirflowSensorTimeout):
+ task.execute(None)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e83b0155/tests/contrib/sensors/test_hdfs_sensors.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_hdfs_sensors.py b/tests/contrib/sensors/test_hdfs_sensors.py
deleted file mode 100644
index cd191f8..0000000
--- a/tests/contrib/sensors/test_hdfs_sensors.py
+++ /dev/null
@@ -1,248 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import logging
-import unittest
-
-import re
-from datetime import timedelta
-
-from airflow.contrib.sensors.hdfs_sensors import HdfsSensorFolder, HdfsSensorRegex
-from airflow.exceptions import AirflowSensorTimeout
-
-
-class HdfsSensorFolderTests(unittest.TestCase):
- def setUp(self):
- from tests.core import FakeHDFSHook
- self.hook = FakeHDFSHook
- self.log = logging.getLogger()
- self.log.setLevel(logging.DEBUG)
-
- def test_should_be_empty_directory(self):
- """
- test the empty directory behaviour
- :return:
- """
- # Given
- self.log.debug('#' * 10)
- self.log.debug('Running %s', self._testMethodName)
- self.log.debug('#' * 10)
- task = HdfsSensorFolder(task_id='Should_be_empty_directory',
- filepath='/datadirectory/empty_directory',
- be_empty=True,
- timeout=1,
- retry_delay=timedelta(seconds=1),
- poke_interval=1,
- hook=self.hook)
-
- # When
- task.execute(None)
-
- # Then
- # Nothing happens, nothing is raised exec is ok
-
- def test_should_be_empty_directory_fail(self):
- """
- test the empty directory behaviour
- :return:
- """
- # Given
- self.log.debug('#' * 10)
- self.log.debug('Running %s', self._testMethodName)
- self.log.debug('#' * 10)
- task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail',
- filepath='/datadirectory/not_empty_directory',
- be_empty=True,
- timeout=1,
- retry_delay=timedelta(seconds=1),
- poke_interval=1,
- hook=self.hook)
-
- # When
- # Then
- with self.assertRaises(AirflowSensorTimeout):
- task.execute(None)
-
- def test_should_be_a_non_empty_directory(self):
- """
- test the empty directory behaviour
- :return:
- """
- # Given
- self.log.debug('#' * 10)
- self.log.debug('Running %s', self._testMethodName)
- self.log.debug('#' * 10)
- task = HdfsSensorFolder(task_id='Should_be_non_empty_directory',
- filepath='/datadirectory/not_empty_directory',
- timeout=1,
- retry_delay=timedelta(seconds=1),
- poke_interval=1,
- hook=self.hook)
-
- # When
- task.execute(None)
-
- # Then
- # Nothing happens, nothing is raised exec is ok
-
- def test_should_be_non_empty_directory_fail(self):
- """
- test the empty directory behaviour
- :return:
- """
- # Given
- self.log.debug('#' * 10)
- self.log.debug('Running %s', self._testMethodName)
- self.log.debug('#' * 10)
- task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail',
- filepath='/datadirectory/empty_directory',
- timeout=1,
- retry_delay=timedelta(seconds=1),
- poke_interval=1,
- hook=self.hook)
-
- # When
- # Then
- with self.assertRaises(AirflowSensorTimeout):
- task.execute(None)
-
-
-class HdfsSensorRegexTests(unittest.TestCase):
- def setUp(self):
- from tests.core import FakeHDFSHook
- self.hook = FakeHDFSHook
- self.log = logging.getLogger()
- self.log.setLevel(logging.DEBUG)
-
- def test_should_match_regex(self):
- """
- test the empty directory behaviour
- :return:
- """
- # Given
- self.log.debug('#' * 10)
- self.log.debug('Running %s', self._testMethodName)
- self.log.debug('#' * 10)
- compiled_regex = re.compile("test[1-2]file")
- task = HdfsSensorRegex(task_id='Should_match_the_regex',
- filepath='/datadirectory/regex_dir',
- regex=compiled_regex,
- timeout=1,
- retry_delay=timedelta(seconds=1),
- poke_interval=1,
- hook=self.hook)
-
- # When
- task.execute(None)
-
- # Then
- # Nothing happens, nothing is raised exec is ok
-
- def test_should_not_match_regex(self):
- """
- test the empty directory behaviour
- :return:
- """
- # Given
- self.log.debug('#' * 10)
- self.log.debug('Running %s', self._testMethodName)
- self.log.debug('#' * 10)
- compiled_regex = re.compile("^IDoNotExist")
- task = HdfsSensorRegex(task_id='Should_not_match_the_regex',
- filepath='/datadirectory/regex_dir',
- regex=compiled_regex,
- timeout=1,
- retry_delay=timedelta(seconds=1),
- poke_interval=1,
- hook=self.hook)
-
- # When
- # Then
- with self.assertRaises(AirflowSensorTimeout):
- task.execute(None)
-
- def test_should_match_regex_and_filesize(self):
- """
- test the file size behaviour with regex
- :return:
- """
- # Given
- self.log.debug('#' * 10)
- self.log.debug('Running %s', self._testMethodName)
- self.log.debug('#' * 10)
- compiled_regex = re.compile("test[1-2]file")
- task = HdfsSensorRegex(task_id='Should_match_the_regex_and_filesize',
- filepath='/datadirectory/regex_dir',
- regex=compiled_regex,
- ignore_copying=True,
- ignored_ext=['_COPYING_', 'sftp'],
- file_size=10,
- timeout=1,
- retry_delay=timedelta(seconds=1),
- poke_interval=1,
- hook=self.hook)
-
- # When
- task.execute(None)
-
- # Then
- # Nothing happens, nothing is raised exec is ok
-
- def test_should_match_regex_but_filesize(self):
- """
- test the file size behaviour with regex
- :return:
- """
- # Given
- self.log.debug('#' * 10)
- self.log.debug('Running %s', self._testMethodName)
- self.log.debug('#' * 10)
- compiled_regex = re.compile("test[1-2]file")
- task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize',
- filepath='/datadirectory/regex_dir',
- regex=compiled_regex,
- file_size=20,
- timeout=1,
- retry_delay=timedelta(seconds=1),
- poke_interval=1,
- hook=self.hook)
-
- # When
- # Then
- with self.assertRaises(AirflowSensorTimeout):
- task.execute(None)
-
- def test_should_match_regex_but_copyingext(self):
- """
- test the file size behaviour with regex
- :return:
- """
- # Given
- self.log.debug('#' * 10)
- self.log.debug('Running %s', self._testMethodName)
- self.log.debug('#' * 10)
- compiled_regex = re.compile("copying_file_\d+.txt")
- task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize',
- filepath='/datadirectory/regex_dir',
- regex=compiled_regex,
- ignored_ext=['_COPYING_', 'sftp'],
- file_size=20,
- timeout=1,
- retry_delay=timedelta(seconds=1),
- poke_interval=1,
- hook=self.hook)
-
- # When
- # Then
- with self.assertRaises(AirflowSensorTimeout):
- task.execute(None)