You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/08/12 18:12:51 UTC

[GitHub] Fokko commented on a change in pull request #3560: [AIRFLOW-2697] Drop snakebite in favour of hdfs3

Fokko commented on a change in pull request #3560: [AIRFLOW-2697] Drop snakebite in favour of hdfs3
URL: https://github.com/apache/incubator-airflow/pull/3560#discussion_r209463504
 
 

 ##########
 File path: tests/sensors/test_hdfs_sensor.py
 ##########
 @@ -7,85 +7,368 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-import unittest
 
 from datetime import timedelta
+import fnmatch
+import posixpath
+import unittest
+
+import mock
+
+from airflow import models
+from airflow.sensors.hdfs_sensor import HdfsFileSensor, HdfsFolderSensor, HdfsHook
+
+
+class MockHdfs3Client(object):
+    """Mock hdfs3 client for testing purposes."""
+
+    def __init__(self, file_details):
+        self._file_details = {entry["name"]: entry for entry in file_details}
+
+    @classmethod
+    def from_file_details(cls, file_details, test_instance, conn_id="hdfs_default"):
+        """Builds mock hdsf3 client using file_details."""
+
+        mock_client = cls(file_details)
+        mock_params = models.Connection(conn_id=conn_id)
+
+        # Setup mock for get_connection.
+        patcher = mock.patch.object(
+            HdfsHook, "get_connection", return_value=mock_params
+        )
+        test_instance.addCleanup(patcher.stop)
+        patcher.start()
+
+        # Setup mock for get_conn.
+        patcher = mock.patch.object(HdfsHook, "get_conn", return_value=mock_client)
+        test_instance.addCleanup(patcher.stop)
+        patcher.start()
+
+        return mock_client, mock_params
+
+    def glob(self, pattern):
+        """Returns glob of files matching pattern."""
+
+        # Implements a non-recursive glob on file names.
+        pattern_dir = posixpath.dirname(pattern)
+        pattern_base = posixpath.basename(pattern)
+
+        # Ensure pattern_dir ends with '/'.
+        pattern_dir = posixpath.join(pattern_dir, "")
+
+        for file_path in self._file_details.keys():
+            file_basename = posixpath.basename(file_path)
+            if file_path.startswith(pattern_dir) and \
+                    fnmatch.fnmatch(file_basename, pattern_base):
+                yield file_path
+
+    def isdir(self, path):
+        """Returns true if path is a directory."""
+
+        try:
+            details = self._file_details[path]
+        except KeyError:
+            raise IOError()
 
-from airflow import configuration
-from airflow.exceptions import AirflowSensorTimeout
-from airflow.sensors.hdfs_sensor import HdfsSensor
-from airflow.utils.timezone import datetime
-from tests.core import FakeHDFSHook
+        return details["kind"] == "directory"
 
-configuration.load_test_config()
+    def info(self, path):
+        """Returns info for given file path."""
 
-DEFAULT_DATE = datetime(2015, 1, 1)
-TEST_DAG_ID = 'unit_test_dag'
+        try:
+            return self._file_details[path]
+        except KeyError:
+            raise IOError()
 
 
-class HdfsSensorTests(unittest.TestCase):
+class HdfsFileSensorTests(unittest.TestCase):
+    """Tests for the HdfsFileSensor class."""
 
     def setUp(self):
-        self.hook = FakeHDFSHook
+        file_details = [
+            {"kind": "directory", "name": "/data/empty", "size": 0},
+            {"kind": "directory", "name": "/data/not_empty", "size": 0},
+            {"kind": "file", "name": "/data/not_empty/small.txt", "size": 10},
+            {"kind": "file", "name": "/data/not_empty/large.txt", "size": 10000000},
+            {"kind": "file", "name": "/data/not_empty/file.txt._COPYING_"},
+        ]
 
-    def test_legacy_file_exist(self):
-        """
-        Test the legacy behaviour
-        :return:
-        """
-        # When
-        task = HdfsSensor(task_id='Should_be_file_legacy',
-                          filepath='/datadirectory/datafile',
-                          timeout=1,
-                          retry_delay=timedelta(seconds=1),
-                          poke_interval=1,
-                          hook=self.hook)
-        task.execute(None)
-
-        # Then
-        # Nothing happens, nothing is raised exec is ok
-
-    def test_legacy_file_exist_but_filesize(self):
+        self._mock_client, self._mock_params = MockHdfs3Client.from_file_details(
+            file_details, test_instance=self
+        )
+
+        self._default_task_kws = {
+            "timeout": 1,
+            "retry_delay": timedelta(seconds=1),
+            "poke_interval": 1,
+        }
+
+    def test_existing_file(self):
+        """Tests poking for existing file."""
+
+        task = HdfsFileSensor(
+            task_id="existing_file",
+            pattern="/data/not_empty/small.txt",
+            **self._default_task_kws
+        )
+        self.assertTrue(task.poke(context={}))
+
+    def test_existing_file_glob(self):
+        """Tests poking for existing file with glob."""
+
+        task = HdfsFileSensor(
+            task_id="existing_file",
+            pattern="/data/not_empty/*.txt",
+            **self._default_task_kws
+        )
+        self.assertTrue(task.poke(context={}))
+
+    def test_nonexisting_file(self):
+        """Tests poking for non-existing file."""
+
+        task = HdfsFileSensor(
+            task_id="nonexisting_file",
+            pattern="/data/not_empty/random.txt",
+            **self._default_task_kws
+        )
+        self.assertFalse(task.poke(context={}))
+
+    def test_nonexisting_file_glob(self):
+        """Tests poking for non-existing file with glob."""
+
+        task = HdfsFileSensor(
+            task_id="existing_file",
+            pattern="/data/not_empty/*.xml",
+            **self._default_task_kws
+        )
+        self.assertFalse(task.poke(context={}))
+
+    def test_nonexisting_path(self):
+        """Tests poking for non-existing path."""
+
+        task = HdfsFileSensor(
+            task_id="nonexisting_path",
+            pattern="/data/not_empty/small.txt",
+            **self._default_task_kws
+        )
+
+        with mock.patch.object(self._mock_client, "glob", side_effect=IOError):
+            self.assertFalse(task.poke(context={}))
+
+    def test_file_filter_size_small(self):
+        """Tests poking for file while filtering for file size (too small)."""
+
+        task = HdfsFileSensor(
+            task_id="existing_file_too_small",
+            pattern="/data/not_empty/small.txt",
+            min_size=1,
+            **self._default_task_kws
+        )
+        self.assertFalse(task.poke(context={}))
+
+    def test_file_filter_size_large(self):
+        """Tests poking for file while filtering for file size (large)."""
+
+        task = HdfsFileSensor(
+            task_id="existing_file_large",
+            pattern="/data/not_empty/large.txt",
+            min_size=1,
+            **self._default_task_kws
+        )
+        self.assertTrue(task.poke(context={}))
+
+    def test_file_filter_ext(self):
+        """Tests poking for file while filtering for extension."""
+
+        task = HdfsFileSensor(
+            task_id="existing_file_large",
+            pattern="/data/not_empty/f*",
+            ignore_exts=("_COPYING_",),
+            **self._default_task_kws
+        )
+        self.assertFalse(task.poke(context={}))
+
+
+class HdfsFolderSensorTests(unittest.TestCase):
 
 Review comment:
   as well :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services