You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/03/22 19:16:20 UTC

[airflow] 09/31: Fix filesystem sensor for directories (#21729)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8d486ff8de0cac681c1e0c23afc715728b433ec5
Author: Mikhail Ilchenko <pr...@gmail.com>
AuthorDate: Tue Mar 1 12:18:34 2022 +0300

    Fix filesystem sensor for directories (#21729)
    
    Fix walking through wildcarded directory in `FileSensor.poke` method
    
    (cherry picked from commit 6b0ca646ec849af91fe8a10d3d5656cafa3ed4bd)
---
 airflow/sensors/filesystem.py    |  2 +-
 tests/sensors/test_filesystem.py | 36 ++++++++++++++++++++++++++++++++++--
 2 files changed, 35 insertions(+), 3 deletions(-)

diff --git a/airflow/sensors/filesystem.py b/airflow/sensors/filesystem.py
index 130be5c..1a4711c 100644
--- a/airflow/sensors/filesystem.py
+++ b/airflow/sensors/filesystem.py
@@ -65,7 +65,7 @@ class FileSensor(BaseSensorOperator):
                 self.log.info('Found File %s last modified: %s', str(path), str(mod_time))
                 return True
 
-            for _, _, files in os.walk(full_path):
+            for _, _, files in os.walk(path):
                 if len(files) > 0:
                     return True
         return False
diff --git a/tests/sensors/test_filesystem.py b/tests/sensors/test_filesystem.py
index 4d23331..e696f1e 100644
--- a/tests/sensors/test_filesystem.py
+++ b/tests/sensors/test_filesystem.py
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-import os.path
+import os
 import shutil
 import tempfile
 import unittest
@@ -131,6 +131,38 @@ class TestFileSensor(unittest.TestCase):
             task._hook = self.hook
             task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
+    def test_wildcard_empty_directory(self):
+        with tempfile.TemporaryDirectory() as temp_dir:
+            with tempfile.TemporaryDirectory(suffix='subdir', dir=temp_dir):
+                task = FileSensor(
+                    task_id='test',
+                    filepath=os.path.join(temp_dir, '*dir'),
+                    fs_conn_id='fs_default',
+                    dag=self.dag,
+                    timeout=0,
+                )
+                task._hook = self.hook
+
+                # No files in dir
+                with pytest.raises(AirflowSensorTimeout):
+                    task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+    def test_wildcard_directory_with_files(self):
+        with tempfile.TemporaryDirectory() as temp_dir:
+            with tempfile.TemporaryDirectory(suffix='subdir', dir=temp_dir) as subdir:
+                task = FileSensor(
+                    task_id='test',
+                    filepath=os.path.join(temp_dir, '*dir'),
+                    fs_conn_id='fs_default',
+                    dag=self.dag,
+                    timeout=0,
+                )
+                task._hook = self.hook
+
+                # `touch` the file in subdir
+                open(os.path.join(subdir, 'file'), 'a').close()
+                task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
     def test_wildcared_directory(self):
         temp_dir = tempfile.mkdtemp()
         subdir = tempfile.mkdtemp(dir=temp_dir)
@@ -146,7 +178,7 @@ class TestFileSensor(unittest.TestCase):
         task._hook = self.hook
 
         try:
-            # `touch` the dir
+            # `touch` the file in subdir
             open(subdir + "/file", "a").close()
             task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
         finally: