You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2022/12/19 11:05:57 UTC
[airflow] branch main updated: Fix EmrAddStepsOperature wait_for_completion parameter is not working (#28052)
This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e377e869da Fix EmrAddStepsOperature wait_for_completion parameter is not working (#28052)
e377e869da is described below
commit e377e869da9f0e42ac1e0a615347cf7cd6565d54
Author: 2h-kim <11...@users.noreply.github.com>
AuthorDate: Mon Dec 19 20:05:51 2022 +0900
Fix EmrAddStepsOperature wait_for_completion parameter is not working (#28052)
* Fix EmrAddStepsOperature wait_for_completion parameter is not working
Co-authored-by: Vincent <97...@users.noreply.github.com>
Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
Co-authored-by: eladkal <45...@users.noreply.github.com>
---
airflow/providers/amazon/aws/operators/emr.py | 5 ++++-
.../amazon/aws/operators/test_emr_add_steps.py | 18 ++++++++++++++++++
2 files changed, 22 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/amazon/aws/operators/emr.py b/airflow/providers/amazon/aws/operators/emr.py
index f8a6929ea0..3728994df3 100644
--- a/airflow/providers/amazon/aws/operators/emr.py
+++ b/airflow/providers/amazon/aws/operators/emr.py
@@ -111,10 +111,13 @@ class EmrAddStepsOperator(BaseOperator):
# steps may arrive as a string representing a list
# e.g. if we used XCom or a file then: steps="[{ step1 }, { step2 }]"
steps = self.steps
+ wait_for_completion = self.wait_for_completion
if isinstance(steps, str):
steps = ast.literal_eval(steps)
- return emr_hook.add_job_flow_steps(job_flow_id=job_flow_id, steps=steps, wait_for_completion=True)
+ return emr_hook.add_job_flow_steps(
+ job_flow_id=job_flow_id, steps=steps, wait_for_completion=wait_for_completion
+ )
class EmrStartNotebookExecutionOperator(BaseOperator):
diff --git a/tests/providers/amazon/aws/operators/test_emr_add_steps.py b/tests/providers/amazon/aws/operators/test_emr_add_steps.py
index 5b5f51030b..c088c3bc2e 100644
--- a/tests/providers/amazon/aws/operators/test_emr_add_steps.py
+++ b/tests/providers/amazon/aws/operators/test_emr_add_steps.py
@@ -207,3 +207,21 @@ class TestEmrAddStepsOperator:
with pytest.raises(AirflowException) as ctx:
operator.execute(self.mock_context)
assert str(ctx.value) == f"No cluster found for name: {cluster_name}"
+
+ @patch("airflow.providers.amazon.aws.hooks.emr.EmrHook.add_job_flow_steps")
+ def test_wait_for_completion(self, mock_add_job_flow_steps):
+ job_flow_id = "j-8989898989"
+ operator = EmrAddStepsOperator(
+ task_id="test_task",
+ job_flow_id=job_flow_id,
+ aws_conn_id="aws_default",
+ dag=DAG("test_dag_id", default_args=self.args),
+ wait_for_completion=False,
+ )
+ operator.execute(self.mock_context)
+
+ mock_add_job_flow_steps.assert_called_once_with(
+ job_flow_id=job_flow_id,
+ steps=[],
+ wait_for_completion=False,
+ )