You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/02/20 01:02:18 UTC

[airflow] branch main updated: Add deferrable mode to DataprocInstantiateWorkflowTemplateOperator (#28618)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1677d80e65 Add deferrable mode to DataprocInstantiateWorkflowTemplateOperator (#28618)
1677d80e65 is described below

commit 1677d80e6573acfc7a706ac25c4ee3a353071f7b
Author: Beata Kossakowska <10...@users.noreply.github.com>
AuthorDate: Mon Feb 20 02:02:08 2023 +0100

    Add deferrable mode to DataprocInstantiateWorkflowTemplateOperator (#28618)
    
    Co-authored-by: Beata Kossakowska <bk...@google.com>
---
 airflow/providers/google/cloud/hooks/dataproc.py   |   8 ++
 .../providers/google/cloud/operators/dataproc.py   |  42 +++++++-
 .../providers/google/cloud/triggers/dataproc.py    |  90 +++++++++++++++++
 .../operators/cloud/dataproc.rst                   |   8 ++
 .../providers/google/cloud/hooks/test_dataproc.py  |  11 +++
 .../google/cloud/operators/test_dataproc.py        |  32 ++++++
 .../google/cloud/triggers/test_dataproc.py         | 110 +++++++++++++++++++--
 .../cloud/dataproc/example_dataproc_workflow.py    |  17 +++-
 8 files changed, 303 insertions(+), 15 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/dataproc.py b/airflow/providers/google/cloud/hooks/dataproc.py
index 5b77f162df..2a09d7c61f 100644
--- a/airflow/providers/google/cloud/hooks/dataproc.py
+++ b/airflow/providers/google/cloud/hooks/dataproc.py
@@ -28,6 +28,7 @@ from google.api_core.exceptions import ServerError
 from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
 from google.api_core.operation import Operation
 from google.api_core.operation_async import AsyncOperation
+from google.api_core.operations_v1.operations_client import OperationsClient
 from google.api_core.retry import Retry
 from google.cloud.dataproc_v1 import (
     Batch,
@@ -1047,6 +1048,10 @@ class DataprocAsyncHook(GoogleBaseHook):
             credentials=self.get_credentials(), client_info=CLIENT_INFO, client_options=client_options
         )
 
+    def get_operations_client(self, region: str) -> OperationsClient:
+        """Returns OperationsClient"""
+        return self.get_template_client(region=region).transport.operations_client
+
     @GoogleBaseHook.fallback_to_default_project_id
     async def create_cluster(
         self,
@@ -1459,6 +1464,9 @@ class DataprocAsyncHook(GoogleBaseHook):
         )
         return operation
 
+    async def get_operation(self, region, operation_name):
+        return await self.get_operations_client(region).get_operation(name=operation_name)
+
     @GoogleBaseHook.fallback_to_default_project_id
     async def get_job(
         self,
diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py
index 987edddf19..240b402696 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -55,6 +55,7 @@ from airflow.providers.google.cloud.triggers.dataproc import (
     DataprocClusterTrigger,
     DataprocDeleteClusterTrigger,
     DataprocSubmitTrigger,
+    DataprocWorkflowTrigger,
 )
 from airflow.utils import timezone
 
@@ -1688,7 +1689,7 @@ class DataprocInstantiateWorkflowTemplateOperator(BaseOperator):
 
     .. seealso::
         Please refer to:
-        https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiate
+        https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.workflowTemplates/instantiate
 
     :param template_id: The id of the template. (templated)
     :param project_id: The ID of the google cloud project in which
@@ -1717,6 +1718,8 @@ class DataprocInstantiateWorkflowTemplateOperator(BaseOperator):
         If set as a sequence, the identities from the list must grant
         Service Account Token Creator IAM role to the directly preceding identity, with first
         account from the list granting this role to the originating account (templated).
+    :param deferrable: Run operator in the deferrable mode.
+    :param polling_interval_seconds: Time (seconds) to wait between calls to check the run status.
     """
 
     template_fields: Sequence[str] = ("template_id", "impersonation_chain", "request_id", "parameters")
@@ -1737,10 +1740,13 @@ class DataprocInstantiateWorkflowTemplateOperator(BaseOperator):
         metadata: Sequence[tuple[str, str]] = (),
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
+        deferrable: bool = False,
+        polling_interval_seconds: int = 10,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
-
+        if deferrable and polling_interval_seconds <= 0:
+            raise ValueError("Invalid value for polling_interval_seconds. Expected value greater than 0")
         self.template_id = template_id
         self.parameters = parameters
         self.version = version
@@ -1752,6 +1758,8 @@ class DataprocInstantiateWorkflowTemplateOperator(BaseOperator):
         self.request_id = request_id
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
+        self.deferrable = deferrable
+        self.polling_interval_seconds = polling_interval_seconds
 
     def execute(self, context: Context):
         hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
@@ -1772,8 +1780,34 @@ class DataprocInstantiateWorkflowTemplateOperator(BaseOperator):
             context=context, task_instance=self, url=DATAPROC_WORKFLOW_LINK, resource=self.workflow_id
         )
         self.log.info("Template instantiated. Workflow Id : %s", self.workflow_id)
-        operation.result()
-        self.log.info("Workflow %s completed successfully", self.workflow_id)
+        if not self.deferrable:
+            hook.wait_for_operation(timeout=self.timeout, result_retry=self.retry, operation=operation)
+            self.log.info("Workflow %s completed successfully", self.workflow_id)
+        else:
+            self.defer(
+                trigger=DataprocWorkflowTrigger(
+                    template_name=self.template_id,
+                    name=operation.operation.name,
+                    project_id=self.project_id,
+                    region=self.region,
+                    gcp_conn_id=self.gcp_conn_id,
+                    impersonation_chain=self.impersonation_chain,
+                    polling_interval_seconds=self.polling_interval_seconds,
+                ),
+                method_name="execute_complete",
+            )
+
+    def execute_complete(self, context, event=None) -> None:
+        """
+        Callback for when the trigger fires - returns immediately.
+        Relies on trigger to throw an exception, otherwise it assumes execution was
+        successful.
+        """
+        if event["status"] == "failed" or event["status"] == "error":
+            self.log.exception("Unexpected error in the operation.")
+            raise AirflowException(event["message"])
+
+        self.log.info("Workflow %s completed successfully", event["operation_name"])
 
 
 class DataprocInstantiateInlineWorkflowTemplateOperator(BaseOperator):
diff --git a/airflow/providers/google/cloud/triggers/dataproc.py b/airflow/providers/google/cloud/triggers/dataproc.py
index 265e04489d..02c5d25f6a 100644
--- a/airflow/providers/google/cloud/triggers/dataproc.py
+++ b/airflow/providers/google/cloud/triggers/dataproc.py
@@ -310,3 +310,93 @@ class DataprocDeleteClusterTrigger(BaseTrigger):
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
         )
+
+
+class DataprocWorkflowTrigger(BaseTrigger):
+    """
+    Trigger that periodically polls information from Dataproc API to verify status.
+    Implementation leverages asynchronous transport.
+    """
+
+    def __init__(
+        self,
+        template_name: str,
+        name: str,
+        region: str,
+        project_id: str | None = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        delegate_to: str | None = None,
+        polling_interval_seconds: int = 10,
+    ):
+        super().__init__()
+        self.gcp_conn_id = gcp_conn_id
+        self.template_name = template_name
+        self.name = name
+        self.impersonation_chain = impersonation_chain
+        self.project_id = project_id
+        self.region = region
+        self.polling_interval_seconds = polling_interval_seconds
+        self.delegate_to = delegate_to
+        if delegate_to:
+            warnings.warn(
+                "'delegate_to' parameter is deprecated, please use 'impersonation_chain'", DeprecationWarning
+            )
+
+    def serialize(self):
+        return (
+            "airflow.providers.google.cloud.triggers.dataproc.DataprocWorkflowTrigger",
+            {
+                "template_name": self.template_name,
+                "name": self.name,
+                "project_id": self.project_id,
+                "region": self.region,
+                "gcp_conn_id": self.gcp_conn_id,
+                "delegate_to": self.delegate_to,
+                "impersonation_chain": self.impersonation_chain,
+                "polling_interval_seconds": self.polling_interval_seconds,
+            },
+        )
+
+    async def run(self) -> AsyncIterator["TriggerEvent"]:
+        hook = self._get_hook()
+        while True:
+            try:
+                operation = await hook.get_operation(region=self.region, operation_name=self.name)
+                if operation.done:
+                    if operation.error.message:
+                        yield TriggerEvent(
+                            {
+                                "operation_name": operation.name,
+                                "operation_done": operation.done,
+                                "status": "error",
+                                "message": operation.error.message,
+                            }
+                        )
+                        return
+                    yield TriggerEvent(
+                        {
+                            "operation_name": operation.name,
+                            "operation_done": operation.done,
+                            "status": "success",
+                            "message": "Operation is successfully ended.",
+                        }
+                    )
+                    return
+                else:
+                    self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                    await asyncio.sleep(self.polling_interval_seconds)
+            except Exception as e:
+                self.log.exception("Exception occurred while checking operation status.")
+                yield TriggerEvent(
+                    {
+                        "status": "failed",
+                        "message": str(e),
+                    }
+                )
+
+    def _get_hook(self) -> DataprocAsyncHook:  # type: ignore[override]
+        return DataprocAsyncHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst b/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst
index f92c9993cb..5705f59f90 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst
@@ -262,6 +262,14 @@ Once a workflow is created users can trigger it using
     :start-after: [START how_to_cloud_dataproc_trigger_workflow_template]
     :end-before: [END how_to_cloud_dataproc_trigger_workflow_template]
 
+Also for all this action you can use operator in the deferrable mode:
+
+.. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_cloud_dataproc_trigger_workflow_template_async]
+    :end-before: [END how_to_cloud_dataproc_trigger_workflow_template_async]
+
 The inline operator is an alternative. It creates a workflow, run it, and delete it afterwards:
 :class:`~airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator`:
 
diff --git a/tests/providers/google/cloud/hooks/test_dataproc.py b/tests/providers/google/cloud/hooks/test_dataproc.py
index 4d5eeac175..38661dfc2a 100644
--- a/tests/providers/google/cloud/hooks/test_dataproc.py
+++ b/tests/providers/google/cloud/hooks/test_dataproc.py
@@ -729,6 +729,17 @@ class TestDataprocAsyncHook:
             metadata=(),
         )
 
+    @pytest.mark.asyncio
+    @async_mock.patch(DATAPROC_STRING.format("DataprocAsyncHook.get_operation"))
+    async def test_get_operation(self, mock_client):
+        mock_client.return_value = None
+        hook = DataprocAsyncHook(
+            gcp_conn_id="google_cloud_default", delegate_to=None, impersonation_chain=None
+        )
+        await hook.get_operation(region=GCP_LOCATION, operation_name="operation_name")
+        mock_client.assert_called_once()
+        mock_client.assert_called_with(region=GCP_LOCATION, operation_name="operation_name")
+
     @mock.patch(DATAPROC_STRING.format("DataprocAsyncHook.get_template_client"))
     def test_instantiate_workflow_template_missing_region(self, mock_client):
         with pytest.raises(TypeError):
diff --git a/tests/providers/google/cloud/operators/test_dataproc.py b/tests/providers/google/cloud/operators/test_dataproc.py
index e462023f42..ce407aa465 100644
--- a/tests/providers/google/cloud/operators/test_dataproc.py
+++ b/tests/providers/google/cloud/operators/test_dataproc.py
@@ -59,6 +59,7 @@ from airflow.providers.google.cloud.triggers.dataproc import (
     DataprocClusterTrigger,
     DataprocDeleteClusterTrigger,
     DataprocSubmitTrigger,
+    DataprocWorkflowTrigger,
 )
 from airflow.providers.google.common.consts import GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME
 from airflow.serialization.serialized_objects import SerializedDAG
@@ -441,6 +442,7 @@ class TestDataprocClusterCreateOperator(DataprocClusterTestBase):
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
     def test_execute(self, mock_hook, to_dict_mock):
         self.extra_links_manager_mock.attach_mock(mock_hook, "hook")
+        mock_hook.return_value.create_cluster.result.return_value = None
         create_cluster_args = {
             "region": GCP_REGION,
             "project_id": GCP_PROJECT,
@@ -1363,6 +1365,36 @@ class TestDataprocWorkflowTemplateInstantiateOperator(unittest.TestCase):
             metadata=METADATA,
         )
 
+    @mock.patch(DATAPROC_PATH.format("DataprocHook"))
+    @mock.patch(DATAPROC_TRIGGERS_PATH.format("DataprocAsyncHook"))
+    def test_execute_call_defer_method(self, mock_trigger_hook, mock_hook):
+        operator = DataprocInstantiateWorkflowTemplateOperator(
+            task_id=TASK_ID,
+            template_id=TEMPLATE_ID,
+            region=GCP_REGION,
+            project_id=GCP_PROJECT,
+            version=2,
+            parameters={},
+            request_id=REQUEST_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+            deferrable=True,
+        )
+
+        with pytest.raises(TaskDeferred) as exc:
+            operator.execute(mock.MagicMock())
+
+        mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
+
+        mock_hook.return_value.instantiate_workflow_template.assert_called_once()
+
+        mock_hook.return_value.wait_for_operation.assert_not_called()
+        assert isinstance(exc.value.trigger, DataprocWorkflowTrigger)
+        assert exc.value.method_name == GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME
+
 
 @pytest.mark.need_serialized_dag
 @mock.patch(DATAPROC_PATH.format("DataprocHook"))
diff --git a/tests/providers/google/cloud/triggers/test_dataproc.py b/tests/providers/google/cloud/triggers/test_dataproc.py
index 90ea36c31d..27f41997b9 100644
--- a/tests/providers/google/cloud/triggers/test_dataproc.py
+++ b/tests/providers/google/cloud/triggers/test_dataproc.py
@@ -22,8 +22,13 @@ from asyncio import Future
 
 import pytest
 from google.cloud.dataproc_v1 import Batch, ClusterStatus
+from google.rpc.status_pb2 import Status
 
-from airflow.providers.google.cloud.triggers.dataproc import DataprocBatchTrigger, DataprocClusterTrigger
+from airflow.providers.google.cloud.triggers.dataproc import (
+    DataprocBatchTrigger,
+    DataprocClusterTrigger,
+    DataprocWorkflowTrigger,
+)
 from airflow.triggers.base import TriggerEvent
 from tests.providers.google.cloud.utils.compat import async_mock
 
@@ -39,10 +44,12 @@ BATCH_CONFIG = {
 TEST_CLUSTER_NAME = "cluster_name"
 TEST_POLL_INTERVAL = 5
 TEST_GCP_CONN_ID = "google_cloud_default"
+TEST_TEMPLATE_NAME = "template_name"
+TEST_OPERATION_NAME = "name"
 
 
 @pytest.fixture
-def trigger():
+def cluster_trigger():
     return DataprocClusterTrigger(
         cluster_name=TEST_CLUSTER_NAME,
         project_id=TEST_PROJECT_ID,
@@ -66,6 +73,19 @@ def batch_trigger():
     return trigger
 
 
+@pytest.fixture
+def workflow_trigger():
+    return DataprocWorkflowTrigger(
+        template_name=TEST_TEMPLATE_NAME,
+        name=TEST_OPERATION_NAME,
+        project_id=TEST_PROJECT_ID,
+        region=TEST_REGION,
+        gcp_conn_id=TEST_GCP_CONN_ID,
+        impersonation_chain=None,
+        polling_interval_seconds=TEST_POLL_INTERVAL,
+    )
+
+
 @pytest.fixture()
 def async_get_cluster():
     def func(**kwargs):
@@ -90,9 +110,21 @@ def async_get_batch():
     return func
 
 
+@pytest.fixture()
+def async_get_operation():
+    def func(**kwargs):
+        m = async_mock.MagicMock()
+        m.configure_mock(**kwargs)
+        f = Future()
+        f.set_result(m)
+        return f
+
+    return func
+
+
 class TestDataprocClusterTrigger:
-    def test_async_cluster_trigger_serialization_should_execute_successfully(self, trigger):
-        classpath, kwargs = trigger.serialize()
+    def test_async_cluster_trigger_serialization_should_execute_successfully(self, cluster_trigger):
+        classpath, kwargs = cluster_trigger.serialize()
         assert classpath == "airflow.providers.google.cloud.triggers.dataproc.DataprocClusterTrigger"
         assert kwargs == {
             "cluster_name": TEST_CLUSTER_NAME,
@@ -106,7 +138,7 @@ class TestDataprocClusterTrigger:
     @pytest.mark.asyncio
     @async_mock.patch("airflow.providers.google.cloud.hooks.dataproc.DataprocAsyncHook.get_cluster")
     async def test_async_cluster_triggers_on_success_should_execute_successfully(
-        self, mock_hook, trigger, async_get_cluster
+        self, mock_hook, cluster_trigger, async_get_cluster
     ):
         mock_hook.return_value = async_get_cluster(
             project_id=TEST_PROJECT_ID,
@@ -115,7 +147,7 @@ class TestDataprocClusterTrigger:
             status=ClusterStatus(state=ClusterStatus.State.RUNNING),
         )
 
-        generator = trigger.run()
+        generator = cluster_trigger.run()
         actual_event = await generator.asend(None)
 
         expected_event = TriggerEvent(
@@ -129,7 +161,9 @@ class TestDataprocClusterTrigger:
 
     @pytest.mark.asyncio
     @async_mock.patch("airflow.providers.google.cloud.hooks.dataproc.DataprocAsyncHook.get_cluster")
-    async def test_async_cluster_trigger_run_returns_error_event(self, mock_hook, trigger, async_get_cluster):
+    async def test_async_cluster_trigger_run_returns_error_event(
+        self, mock_hook, cluster_trigger, async_get_cluster
+    ):
         mock_hook.return_value = async_get_cluster(
             project_id=TEST_PROJECT_ID,
             region=TEST_REGION,
@@ -137,7 +171,7 @@ class TestDataprocClusterTrigger:
             status=ClusterStatus(state=ClusterStatus.State.ERROR),
         )
 
-        actual_event = await (trigger.run()).asend(None)
+        actual_event = await (cluster_trigger.run()).asend(None)
         await asyncio.sleep(0.5)
 
         expected_event = TriggerEvent(
@@ -151,7 +185,9 @@ class TestDataprocClusterTrigger:
 
     @pytest.mark.asyncio
     @async_mock.patch("airflow.providers.google.cloud.hooks.dataproc.DataprocAsyncHook.get_cluster")
-    async def test_cluster_run_loop_is_still_running(self, mock_hook, trigger, caplog, async_get_cluster):
+    async def test_cluster_run_loop_is_still_running(
+        self, mock_hook, cluster_trigger, caplog, async_get_cluster
+    ):
         mock_hook.return_value = async_get_cluster(
             project_id=TEST_PROJECT_ID,
             region=TEST_REGION,
@@ -161,7 +197,7 @@ class TestDataprocClusterTrigger:
 
         caplog.set_level(logging.INFO)
 
-        task = asyncio.create_task(trigger.run().__anext__())
+        task = asyncio.create_task(cluster_trigger.run().__anext__())
         await asyncio.sleep(0.5)
 
         assert not task.done()
@@ -248,3 +284,57 @@ class TestDataprocBatchTrigger:
         assert not task.done()
         assert f"Current state is: {Batch.State.RUNNING}"
         assert f"Sleeping for {TEST_POLL_INTERVAL} seconds."
+
+
+class TestDataprocWorkflowTrigger:
+    def test_async_cluster_trigger_serialization_should_execute_successfully(self, workflow_trigger):
+        classpath, kwargs = workflow_trigger.serialize()
+        assert classpath == "airflow.providers.google.cloud.triggers.dataproc.DataprocWorkflowTrigger"
+        assert kwargs == {
+            "template_name": TEST_TEMPLATE_NAME,
+            "name": TEST_OPERATION_NAME,
+            "project_id": TEST_PROJECT_ID,
+            "region": TEST_REGION,
+            "gcp_conn_id": TEST_GCP_CONN_ID,
+            "delegate_to": None,
+            "impersonation_chain": None,
+            "polling_interval_seconds": TEST_POLL_INTERVAL,
+        }
+
+    @pytest.mark.asyncio
+    @async_mock.patch("airflow.providers.google.cloud.triggers.dataproc.DataprocWorkflowTrigger._get_hook")
+    async def test_async_workflow_triggers_on_success_should_execute_successfully(
+        self, mock_hook, workflow_trigger, async_get_operation
+    ):
+        mock_hook.return_value.get_operation.return_value = async_get_operation(
+            name=TEST_OPERATION_NAME, done=True, response={}, error=Status(message="")
+        )
+
+        expected_event = TriggerEvent(
+            {
+                "operation_name": TEST_OPERATION_NAME,
+                "operation_done": True,
+                "status": "success",
+                "message": "Operation is successfully ended.",
+            }
+        )
+        actual_event = await (workflow_trigger.run()).asend(None)
+        assert expected_event == actual_event
+
+    @pytest.mark.asyncio
+    @async_mock.patch("airflow.providers.google.cloud.triggers.dataproc.DataprocWorkflowTrigger._get_hook")
+    async def test_async_workflow_triggers_on_error(self, mock_hook, workflow_trigger, async_get_operation):
+        mock_hook.return_value.get_operation.return_value = async_get_operation(
+            name=TEST_OPERATION_NAME, done=True, response={}, error=Status(message="test_error")
+        )
+
+        expected_event = TriggerEvent(
+            {
+                "operation_name": TEST_OPERATION_NAME,
+                "operation_done": True,
+                "status": "error",
+                "message": "test_error",
+            }
+        )
+        actual_event = await (workflow_trigger.run()).asend(None)
+        assert expected_event == actual_event
diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
index 6f04c3dc3b..778b24342e 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
@@ -83,13 +83,28 @@ with models.DAG(
     )
     # [END how_to_cloud_dataproc_trigger_workflow_template]
 
+    # [START how_to_cloud_dataproc_trigger_workflow_template_async]
+    trigger_workflow_async = DataprocInstantiateWorkflowTemplateOperator(
+        task_id="trigger_workflow_async",
+        region=REGION,
+        project_id=PROJECT_ID,
+        template_id=WORKFLOW_NAME,
+        deferrable=True,
+    )
+    # [END how_to_cloud_dataproc_trigger_workflow_template_async]
+
     # [START how_to_cloud_dataproc_instantiate_inline_workflow_template]
     instantiate_inline_workflow_template = DataprocInstantiateInlineWorkflowTemplateOperator(
         task_id="instantiate_inline_workflow_template", template=WORKFLOW_TEMPLATE, region=REGION
     )
     # [END how_to_cloud_dataproc_instantiate_inline_workflow_template]
 
-    create_workflow_template >> trigger_workflow >> instantiate_inline_workflow_template
+    (
+        create_workflow_template
+        >> trigger_workflow
+        >> instantiate_inline_workflow_template
+        >> trigger_workflow_async
+    )
 
     from tests.system.utils.watcher import watcher