You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/26 20:01:35 UTC

[GitHub] Fokko closed pull request #2450: [Airflow-1413] Fix FTPSensor failing on error message with unexpected text.

Fokko closed pull request #2450: [Airflow-1413] Fix FTPSensor failing on error message with unexpected text.
URL: https://github.com/apache/incubator-airflow/pull/2450
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/sensors/ftp_sensor.py b/airflow/contrib/sensors/ftp_sensor.py
index efdedd7a62..69900d5205 100644
--- a/airflow/contrib/sensors/ftp_sensor.py
+++ b/airflow/contrib/sensors/ftp_sensor.py
@@ -17,6 +17,7 @@
 # specific language governing permissions and limitations
 # under the License.
 import ftplib
+import re
 
 from airflow.contrib.hooks.ftp_hook import FTPHook, FTPSHook
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
@@ -26,33 +27,65 @@
 class FTPSensor(BaseSensorOperator):
     """
     Waits for a file or directory to be present on FTP.
-
-    :param path: Remote file or directory path
-    :type path: str
-    :param ftp_conn_id: The connection to run the sensor against
-    :type ftp_conn_id: str
     """
+
     template_fields = ('path',)
 
+    """Errors that are transient in nature, and where action can be retried"""
+    transient_errors = [421, 425, 426, 434, 450, 451, 452]
+
+    error_code_pattern = re.compile("([\d]+)")
+
     @apply_defaults
-    def __init__(self, path, ftp_conn_id='ftp_default', *args, **kwargs):
+    def __init__(
+            self,
+            path,
+            ftp_conn_id='ftp_default',
+            fail_on_transient_errors=True,
+            *args,
+            **kwargs):
+        """
+        Create a new FTP sensor
+
+        :param path: Remote file or directory path
+        :type path: str
+        :param fail_on_transient_errors: Fail on all errors,
+            including 4xx transient errors. Default True.
+        :type fail_on_transient_errors: bool
+        :param ftp_conn_id: The connection to run the sensor against
+        :type ftp_conn_id: str
+        """
+
         super(FTPSensor, self).__init__(*args, **kwargs)
 
         self.path = path
         self.ftp_conn_id = ftp_conn_id
+        self.fail_on_transient_errors = fail_on_transient_errors
 
     def _create_hook(self):
         """Return connection hook."""
         return FTPHook(ftp_conn_id=self.ftp_conn_id)
 
+    def _get_error_code(self, e):
+        """Extract error code from ftp exception"""
+        try:
+            matches = self.error_code_pattern.match(str(e))
+            code = int(matches.group(0))
+            return code
+        except ValueError:
+            return e
+
     def poke(self, context):
         with self._create_hook() as hook:
             self.log.info('Poking for %s', self.path)
             try:
                 hook.get_mod_time(self.path)
             except ftplib.error_perm as e:
-                error = str(e).split(None, 1)
-                if error[1] != "Can't check for file existence":
+                self.log.info('Ftp error encountered: %s', str(e))
+                error_code = self._get_error_code(e)
+                if ((error_code != 550) and
+                        (self.fail_on_transient_errors or
+                            (error_code not in self.transient_errors))):
                     raise e
 
                 return False
diff --git a/tests/contrib/sensors/test_ftp_sensor.py b/tests/contrib/sensors/test_ftp_sensor.py
index cf0fdb4918..8996dc08e0 100644
--- a/tests/contrib/sensors/test_ftp_sensor.py
+++ b/tests/contrib/sensors/test_ftp_sensor.py
@@ -49,8 +49,13 @@ def test_poke(self):
                        task_id="test_task")
 
         self.hook_mock.get_mod_time.side_effect = \
-            [error_perm("550: Can't check for file existence"), None]
+            [error_perm("550: Can't check for file existence"),
+                error_perm("550: Directory or file does not exist"),
+                error_perm("550 - Directory or file does not exist"),
+                None]
 
+        self.assertFalse(op.poke(None))
+        self.assertFalse(op.poke(None))
         self.assertFalse(op.poke(None))
         self.assertTrue(op.poke(None))
 
@@ -66,6 +71,28 @@ def test_poke_fails_due_error(self):
 
         self.assertTrue("530" in str(context.exception))
 
+    def test_poke_fail_on_transient_error(self):
+        op = FTPSensor(path="foobar.json", ftp_conn_id="bob_ftp",
+                       task_id="test_task")
+
+        self.hook_mock.get_mod_time.side_effect = \
+            error_perm("434: Host unavailable")
+
+        with self.assertRaises(error_perm) as context:
+            op.execute(None)
+
+        self.assertTrue("434" in str(context.exception))
+
+    def test_poke_ignore_transient_error(self):
+        op = FTPSensor(path="foobar.json", ftp_conn_id="bob_ftp",
+                       task_id="test_task", fail_on_transient_errors=False)
+
+        self.hook_mock.get_mod_time.side_effect = \
+            [error_perm("434: Host unavailable"), None]
+
+        self.assertFalse(op.poke(None))
+        self.assertTrue(op.poke(None))
+
 
 if __name__ == '__main__':
     unittest.main()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services