You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/03/22 19:16:20 UTC
[airflow] 09/31: Fix filesystem sensor for directories (#21729)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 8d486ff8de0cac681c1e0c23afc715728b433ec5
Author: Mikhail Ilchenko <pr...@gmail.com>
AuthorDate: Tue Mar 1 12:18:34 2022 +0300
Fix filesystem sensor for directories (#21729)
Fix walking through wildcarded directory in `FileSensor.poke` method
(cherry picked from commit 6b0ca646ec849af91fe8a10d3d5656cafa3ed4bd)
---
airflow/sensors/filesystem.py | 2 +-
tests/sensors/test_filesystem.py | 36 ++++++++++++++++++++++++++++++++++--
2 files changed, 35 insertions(+), 3 deletions(-)
diff --git a/airflow/sensors/filesystem.py b/airflow/sensors/filesystem.py
index 130be5c..1a4711c 100644
--- a/airflow/sensors/filesystem.py
+++ b/airflow/sensors/filesystem.py
@@ -65,7 +65,7 @@ class FileSensor(BaseSensorOperator):
self.log.info('Found File %s last modified: %s', str(path), str(mod_time))
return True
- for _, _, files in os.walk(full_path):
+ for _, _, files in os.walk(path):
if len(files) > 0:
return True
return False
diff --git a/tests/sensors/test_filesystem.py b/tests/sensors/test_filesystem.py
index 4d23331..e696f1e 100644
--- a/tests/sensors/test_filesystem.py
+++ b/tests/sensors/test_filesystem.py
@@ -17,7 +17,7 @@
# under the License.
#
-import os.path
+import os
import shutil
import tempfile
import unittest
@@ -131,6 +131,38 @@ class TestFileSensor(unittest.TestCase):
task._hook = self.hook
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+ def test_wildcard_empty_directory(self):
+ with tempfile.TemporaryDirectory() as temp_dir:
+ with tempfile.TemporaryDirectory(suffix='subdir', dir=temp_dir):
+ task = FileSensor(
+ task_id='test',
+ filepath=os.path.join(temp_dir, '*dir'),
+ fs_conn_id='fs_default',
+ dag=self.dag,
+ timeout=0,
+ )
+ task._hook = self.hook
+
+ # No files in dir
+ with pytest.raises(AirflowSensorTimeout):
+ task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+ def test_wildcard_directory_with_files(self):
+ with tempfile.TemporaryDirectory() as temp_dir:
+ with tempfile.TemporaryDirectory(suffix='subdir', dir=temp_dir) as subdir:
+ task = FileSensor(
+ task_id='test',
+ filepath=os.path.join(temp_dir, '*dir'),
+ fs_conn_id='fs_default',
+ dag=self.dag,
+ timeout=0,
+ )
+ task._hook = self.hook
+
+ # `touch` the file in subdir
+ open(os.path.join(subdir, 'file'), 'a').close()
+ task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
def test_wildcared_directory(self):
temp_dir = tempfile.mkdtemp()
subdir = tempfile.mkdtemp(dir=temp_dir)
@@ -146,7 +178,7 @@ class TestFileSensor(unittest.TestCase):
task._hook = self.hook
try:
- # `touch` the dir
+ # `touch` the file in subdir
open(subdir + "/file", "a").close()
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
finally: