You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2022/12/14 07:47:54 UTC
[airflow] branch main updated: Fix template rendered bucket_key in S3KeySensor (#28340)
This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 381160c0f6 Fix template rendered bucket_key in S3KeySensor (#28340)
381160c0f6 is described below
commit 381160c0f63a15957a631da9db875f98bb8e9d64
Author: Sung Yun <10...@users.noreply.github.com>
AuthorDate: Wed Dec 14 02:47:46 2022 -0500
Fix template rendered bucket_key in S3KeySensor (#28340)
---
airflow/providers/amazon/aws/sensors/s3.py | 7 ++++--
tests/providers/amazon/aws/sensors/test_s3_key.py | 27 +++++++++++++++++++++++
2 files changed, 32 insertions(+), 2 deletions(-)
diff --git a/airflow/providers/amazon/aws/sensors/s3.py b/airflow/providers/amazon/aws/sensors/s3.py
index 40e31596d7..57c9393c0c 100644
--- a/airflow/providers/amazon/aws/sensors/s3.py
+++ b/airflow/providers/amazon/aws/sensors/s3.py
@@ -86,7 +86,7 @@ class S3KeySensor(BaseSensorOperator):
):
super().__init__(**kwargs)
self.bucket_name = bucket_name
- self.bucket_key = [bucket_key] if isinstance(bucket_key, str) else bucket_key
+ self.bucket_key = bucket_key
self.wildcard_match = wildcard_match
self.check_fn = check_fn
self.aws_conn_id = aws_conn_id
@@ -125,7 +125,10 @@ class S3KeySensor(BaseSensorOperator):
return True
def poke(self, context: Context):
- return all(self._check_key(key) for key in self.bucket_key)
+ if isinstance(self.bucket_key, str):
+ return self._check_key(self.bucket_key)
+ else:
+ return all(self._check_key(key) for key in self.bucket_key)
def get_hook(self) -> S3Hook:
"""Create and return an S3Hook"""
diff --git a/tests/providers/amazon/aws/sensors/test_s3_key.py b/tests/providers/amazon/aws/sensors/test_s3_key.py
index 8d560e2c82..f0832d3df9 100644
--- a/tests/providers/amazon/aws/sensors/test_s3_key.py
+++ b/tests/providers/amazon/aws/sensors/test_s3_key.py
@@ -126,6 +126,33 @@ class TestS3KeySensor:
mock_head_object.assert_called_once_with("key", "bucket")
+ @mock.patch("airflow.providers.amazon.aws.sensors.s3.S3Hook.head_object")
+ def test_parse_list_of_bucket_keys_from_jinja(self, mock_head_object):
+ mock_head_object.return_value = None
+ mock_head_object.side_effect = [{"ContentLength": 0}, {"ContentLength": 0}]
+
+ Variable.set("test_bucket_key", ["s3://bucket/file1", "s3://bucket/file2"])
+
+ execution_date = timezone.datetime(2020, 1, 1)
+
+ dag = DAG("test_s3_key", start_date=execution_date, render_template_as_native_obj=True)
+ op = S3KeySensor(
+ task_id="s3_key_sensor",
+ bucket_key="{{ var.value.test_bucket_key }}",
+ bucket_name=None,
+ dag=dag,
+ )
+
+ dag_run = DagRun(dag_id=dag.dag_id, execution_date=execution_date, run_id="test")
+ ti = TaskInstance(task=op)
+ ti.dag_run = dag_run
+ context = ti.get_template_context()
+ ti.render_templates(context)
+ op.poke(None)
+
+ mock_head_object.assert_any_call("file1", "bucket")
+ mock_head_object.assert_any_call("file2", "bucket")
+
@mock.patch("airflow.providers.amazon.aws.sensors.s3.S3Hook.head_object")
def test_poke(self, mock_head_object):
op = S3KeySensor(task_id="s3_key_sensor", bucket_key="s3://test_bucket/file")