You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2022/12/19 11:05:57 UTC

[airflow] branch main updated: Fix EmrAddStepsOperature wait_for_completion parameter is not working (#28052)

This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e377e869da Fix EmrAddStepsOperature wait_for_completion parameter is not working (#28052)
e377e869da is described below

commit e377e869da9f0e42ac1e0a615347cf7cd6565d54
Author: 2h-kim <11...@users.noreply.github.com>
AuthorDate: Mon Dec 19 20:05:51 2022 +0900

    Fix EmrAddStepsOperature wait_for_completion parameter is not working (#28052)
    
    * Fix EmrAddStepsOperature wait_for_completion parameter is not working
    
    Co-authored-by: Vincent <97...@users.noreply.github.com>
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
    Co-authored-by: eladkal <45...@users.noreply.github.com>
---
 airflow/providers/amazon/aws/operators/emr.py          |  5 ++++-
 .../amazon/aws/operators/test_emr_add_steps.py         | 18 ++++++++++++++++++
 2 files changed, 22 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/amazon/aws/operators/emr.py b/airflow/providers/amazon/aws/operators/emr.py
index f8a6929ea0..3728994df3 100644
--- a/airflow/providers/amazon/aws/operators/emr.py
+++ b/airflow/providers/amazon/aws/operators/emr.py
@@ -111,10 +111,13 @@ class EmrAddStepsOperator(BaseOperator):
         # steps may arrive as a string representing a list
         # e.g. if we used XCom or a file then: steps="[{ step1 }, { step2 }]"
         steps = self.steps
+        wait_for_completion = self.wait_for_completion
         if isinstance(steps, str):
             steps = ast.literal_eval(steps)
 
-        return emr_hook.add_job_flow_steps(job_flow_id=job_flow_id, steps=steps, wait_for_completion=True)
+        return emr_hook.add_job_flow_steps(
+            job_flow_id=job_flow_id, steps=steps, wait_for_completion=wait_for_completion
+        )
 
 
 class EmrStartNotebookExecutionOperator(BaseOperator):
diff --git a/tests/providers/amazon/aws/operators/test_emr_add_steps.py b/tests/providers/amazon/aws/operators/test_emr_add_steps.py
index 5b5f51030b..c088c3bc2e 100644
--- a/tests/providers/amazon/aws/operators/test_emr_add_steps.py
+++ b/tests/providers/amazon/aws/operators/test_emr_add_steps.py
@@ -207,3 +207,21 @@ class TestEmrAddStepsOperator:
             with pytest.raises(AirflowException) as ctx:
                 operator.execute(self.mock_context)
             assert str(ctx.value) == f"No cluster found for name: {cluster_name}"
+
+    @patch("airflow.providers.amazon.aws.hooks.emr.EmrHook.add_job_flow_steps")
+    def test_wait_for_completion(self, mock_add_job_flow_steps):
+        job_flow_id = "j-8989898989"
+        operator = EmrAddStepsOperator(
+            task_id="test_task",
+            job_flow_id=job_flow_id,
+            aws_conn_id="aws_default",
+            dag=DAG("test_dag_id", default_args=self.args),
+            wait_for_completion=False,
+        )
+        operator.execute(self.mock_context)
+
+        mock_add_job_flow_steps.assert_called_once_with(
+            job_flow_id=job_flow_id,
+            steps=[],
+            wait_for_completion=False,
+        )