You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/03/01 09:20:02 UTC

[airflow] branch main updated: Fix filesystem sensor for directories (#21729)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6b0ca64  Fix filesystem sensor for directories (#21729)
6b0ca64 is described below

commit 6b0ca646ec849af91fe8a10d3d5656cafa3ed4bd
Author: Mikhail Ilchenko <pr...@gmail.com>
AuthorDate: Tue Mar 1 12:18:34 2022 +0300

    Fix filesystem sensor for directories (#21729)
    
    Fix walking through wildcarded directory in `FileSensor.poke` method
---
 airflow/sensors/filesystem.py    |  2 +-
 tests/sensors/test_filesystem.py | 36 ++++++++++++++++++++++++++++++++++--
 2 files changed, 35 insertions(+), 3 deletions(-)

diff --git a/airflow/sensors/filesystem.py b/airflow/sensors/filesystem.py
index 0ccf658..9aa5690 100644
--- a/airflow/sensors/filesystem.py
+++ b/airflow/sensors/filesystem.py
@@ -63,7 +63,7 @@ class FileSensor(BaseSensorOperator):
                 self.log.info('Found File %s last modified: %s', str(path), mod_time)
                 return True
 
-            for _, _, files in os.walk(full_path):
+            for _, _, files in os.walk(path):
                 if len(files) > 0:
                     return True
         return False
diff --git a/tests/sensors/test_filesystem.py b/tests/sensors/test_filesystem.py
index 4d23331..e696f1e 100644
--- a/tests/sensors/test_filesystem.py
+++ b/tests/sensors/test_filesystem.py
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-import os.path
+import os
 import shutil
 import tempfile
 import unittest
@@ -131,6 +131,38 @@ class TestFileSensor(unittest.TestCase):
             task._hook = self.hook
             task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
+    def test_wildcard_empty_directory(self):
+        with tempfile.TemporaryDirectory() as temp_dir:
+            with tempfile.TemporaryDirectory(suffix='subdir', dir=temp_dir):
+                task = FileSensor(
+                    task_id='test',
+                    filepath=os.path.join(temp_dir, '*dir'),
+                    fs_conn_id='fs_default',
+                    dag=self.dag,
+                    timeout=0,
+                )
+                task._hook = self.hook
+
+                # No files in dir
+                with pytest.raises(AirflowSensorTimeout):
+                    task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+    def test_wildcard_directory_with_files(self):
+        with tempfile.TemporaryDirectory() as temp_dir:
+            with tempfile.TemporaryDirectory(suffix='subdir', dir=temp_dir) as subdir:
+                task = FileSensor(
+                    task_id='test',
+                    filepath=os.path.join(temp_dir, '*dir'),
+                    fs_conn_id='fs_default',
+                    dag=self.dag,
+                    timeout=0,
+                )
+                task._hook = self.hook
+
+                # `touch` the file in subdir
+                open(os.path.join(subdir, 'file'), 'a').close()
+                task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
     def test_wildcared_directory(self):
         temp_dir = tempfile.mkdtemp()
         subdir = tempfile.mkdtemp(dir=temp_dir)
@@ -146,7 +178,7 @@ class TestFileSensor(unittest.TestCase):
         task._hook = self.hook
 
         try:
-            # `touch` the dir
+            # `touch` the file in subdir
             open(subdir + "/file", "a").close()
             task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
         finally: