You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/10/31 05:35:27 UTC

[airflow] branch main updated: Allow values in WorkflowsCreateExecutionOperator execution argument to be dicts (#27361)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 332c01d6e0 Allow values in WorkflowsCreateExecutionOperator execution argument to be dicts (#27361)
332c01d6e0 is described below

commit 332c01d6e0bef41740e8fbc2c9600e7b3066615b
Author: Robert Karish <ro...@gmail.com>
AuthorDate: Mon Oct 31 01:35:19 2022 -0400

    Allow values in WorkflowsCreateExecutionOperator execution argument to be dicts (#27361)
    
     Convert execution dict values to str (#27165)
---
 airflow/providers/google/cloud/hooks/workflows.py  |  1 +
 .../providers/google/cloud/hooks/test_workflows.py | 25 ++++++++++++++++++++++
 2 files changed, 26 insertions(+)

diff --git a/airflow/providers/google/cloud/hooks/workflows.py b/airflow/providers/google/cloud/hooks/workflows.py
index 9d3fcea51c..a8a99252c7 100644
--- a/airflow/providers/google/cloud/hooks/workflows.py
+++ b/airflow/providers/google/cloud/hooks/workflows.py
@@ -241,6 +241,7 @@ class WorkflowsHook(GoogleBaseHook):
         metadata = metadata or ()
         client = self.get_executions_client()
         parent = f"projects/{project_id}/locations/{location}/workflows/{workflow_id}"
+        execution = {k: str(v) if isinstance(v, dict) else v for k, v in execution.items()}
         return client.create_execution(
             request={"parent": parent, "execution": execution},
             retry=retry,
diff --git a/tests/providers/google/cloud/hooks/test_workflows.py b/tests/providers/google/cloud/hooks/test_workflows.py
index 42de975d00..fd2e5c5704 100644
--- a/tests/providers/google/cloud/hooks/test_workflows.py
+++ b/tests/providers/google/cloud/hooks/test_workflows.py
@@ -27,6 +27,8 @@ WORKFLOW_ID = "workflow_id"
 EXECUTION_ID = "execution_id"
 WORKFLOW = {"aa": "bb"}
 EXECUTION = {"ccc": "ddd"}
+EXECUTION_NESTED = {"argument": {"project_id": "project_id", "location": "us-east1"}, "test": 1}
+EXECUTION_NESTED_OUTPUT = {"argument": "{'project_id': 'project_id', 'location': 'us-east1'}", "test": 1}
 PROJECT_ID = "airflow-testing"
 METADATA = ()
 TIMEOUT = None
@@ -196,6 +198,29 @@ class TestWorkflowsHook:
             metadata=METADATA,
         )
 
+    @mock.patch(BASE_PATH.format("WorkflowsHook.get_executions_client"))
+    def test_create_execution_with_nested(self, mock_client):
+        result = self.hook.create_execution(
+            workflow_id=WORKFLOW_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            execution=EXECUTION_NESTED,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+        assert mock_client.return_value.create_execution.return_value == result
+        mock_client.return_value.create_execution.assert_called_once_with(
+            request=dict(
+                parent=EXECUTION_PARENT,
+                execution=EXECUTION_NESTED_OUTPUT,
+            ),
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
     @mock.patch(BASE_PATH.format("WorkflowsHook.get_executions_client"))
     def test_get_execution(self, mock_client):
         result = self.hook.get_execution(