You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by tu...@apache.org on 2021/01/28 19:35:20 UTC
[airflow] branch master updated: Add Google Cloud Workflows
Operators (#13366)
This is an automated email from the ASF dual-hosted git repository.
turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 6d6588f Add Google Cloud Workflows Operators (#13366)
6d6588f is described below
commit 6d6588fe2b8bb5fa33e930646d963df3e0530f23
Author: Tomek Urbaszek <tu...@gmail.com>
AuthorDate: Thu Jan 28 20:35:09 2021 +0100
Add Google Cloud Workflows Operators (#13366)
Add Google Cloud Workflows Operators, system test, example and sensor
Co-authored-by: Tobiasz Kędzierski <to...@polidea.com>
---
.../google/cloud/example_dags/example_workflows.py | 197 ++++++
airflow/providers/google/cloud/hooks/workflows.py | 401 ++++++++++++
.../providers/google/cloud/operators/workflows.py | 714 +++++++++++++++++++++
.../providers/google/cloud/sensors/workflows.py | 123 ++++
airflow/providers/google/provider.yaml | 14 +
.../operators/cloud/workflows.rst | 185 ++++++
setup.py | 2 +
.../providers/google/cloud/hooks/test_workflows.py | 256 ++++++++
.../google/cloud/operators/test_workflows.py | 383 +++++++++++
.../cloud/operators/test_workflows_system.py | 29 +
.../google/cloud/sensors/test_workflows.py | 108 ++++
.../google/cloud/utils/gcp_authenticator.py | 1 +
12 files changed, 2413 insertions(+)
diff --git a/airflow/providers/google/cloud/example_dags/example_workflows.py b/airflow/providers/google/cloud/example_dags/example_workflows.py
new file mode 100644
index 0000000..0fab435
--- /dev/null
+++ b/airflow/providers/google/cloud/example_dags/example_workflows.py
@@ -0,0 +1,197 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+
+from airflow import DAG
+from airflow.providers.google.cloud.operators.workflows import (
+ WorkflowsCancelExecutionOperator,
+ WorkflowsCreateExecutionOperator,
+ WorkflowsCreateWorkflowOperator,
+ WorkflowsDeleteWorkflowOperator,
+ WorkflowsGetExecutionOperator,
+ WorkflowsGetWorkflowOperator,
+ WorkflowsListExecutionsOperator,
+ WorkflowsListWorkflowsOperator,
+ WorkflowsUpdateWorkflowOperator,
+)
+from airflow.providers.google.cloud.sensors.workflows import WorkflowExecutionSensor
+from airflow.utils.dates import days_ago
+
+LOCATION = os.environ.get("GCP_WORKFLOWS_LOCATION", "us-central1")
+PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id")
+
+WORKFLOW_ID = os.getenv("GCP_WORKFLOWS_WORKFLOW_ID", "airflow-test-workflow")
+
+# [START how_to_define_workflow]
+WORKFLOW_CONTENT = """
+- getCurrentTime:
+ call: http.get
+ args:
+ url: https://us-central1-workflowsample.cloudfunctions.net/datetime
+ result: currentTime
+- readWikipedia:
+ call: http.get
+ args:
+ url: https://en.wikipedia.org/w/api.php
+ query:
+ action: opensearch
+ search: ${currentTime.body.dayOfTheWeek}
+ result: wikiResult
+- returnResult:
+ return: ${wikiResult.body[1]}
+"""
+
+WORKFLOW = {
+ "description": "Test workflow",
+ "labels": {"airflow-version": "dev"},
+ "source_contents": WORKFLOW_CONTENT,
+}
+# [END how_to_define_workflow]
+
+EXECUTION = {"argument": ""}
+
+SLEEP_WORKFLOW_ID = os.getenv("GCP_WORKFLOWS_SLEEP_WORKFLOW_ID", "sleep_workflow")
+SLEEP_WORKFLOW_CONTENT = """
+- someSleep:
+ call: sys.sleep
+ args:
+ seconds: 120
+"""
+
+SLEEP_WORKFLOW = {
+ "description": "Test workflow",
+ "labels": {"airflow-version": "dev"},
+ "source_contents": SLEEP_WORKFLOW_CONTENT,
+}
+
+
+with DAG("example_cloud_workflows", start_date=days_ago(1), schedule_interval=None) as dag:
+ # [START how_to_create_workflow]
+ create_workflow = WorkflowsCreateWorkflowOperator(
+ task_id="create_workflow",
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ workflow=WORKFLOW,
+ workflow_id=WORKFLOW_ID,
+ )
+ # [END how_to_create_workflow]
+
+ # [START how_to_update_workflow]
+ update_workflows = WorkflowsUpdateWorkflowOperator(
+ task_id="update_workflows",
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ workflow_id=WORKFLOW_ID,
+ update_mask={"paths": ["name", "description"]},
+ )
+ # [END how_to_update_workflow]
+
+ # [START how_to_get_workflow]
+ get_workflow = WorkflowsGetWorkflowOperator(
+ task_id="get_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
+ )
+ # [END how_to_get_workflow]
+
+ # [START how_to_list_workflows]
+ list_workflows = WorkflowsListWorkflowsOperator(
+ task_id="list_workflows",
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_list_workflows]
+
+ # [START how_to_delete_workflow]
+ delete_workflow = WorkflowsDeleteWorkflowOperator(
+ task_id="delete_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
+ )
+ # [END how_to_delete_workflow]
+
+ # [START how_to_create_execution]
+ create_execution = WorkflowsCreateExecutionOperator(
+ task_id="create_execution",
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ execution=EXECUTION,
+ workflow_id=WORKFLOW_ID,
+ )
+ # [END how_to_create_execution]
+
+ # [START how_to_wait_for_execution]
+ wait_for_execution = WorkflowExecutionSensor(
+ task_id="wait_for_execution",
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ workflow_id=WORKFLOW_ID,
+ execution_id='{{ task_instance.xcom_pull("create_execution", key="execution_id") }}',
+ )
+ # [END how_to_wait_for_execution]
+
+ # [START how_to_get_execution]
+ get_execution = WorkflowsGetExecutionOperator(
+ task_id="get_execution",
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ workflow_id=WORKFLOW_ID,
+ execution_id='{{ task_instance.xcom_pull("create_execution", key="execution_id") }}',
+ )
+ # [END how_to_get_execution]
+
+ # [START how_to_list_executions]
+ list_executions = WorkflowsListExecutionsOperator(
+ task_id="list_executions", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
+ )
+ # [END how_to_list_executions]
+
+ create_workflow_for_cancel = WorkflowsCreateWorkflowOperator(
+ task_id="create_workflow_for_cancel",
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ workflow=SLEEP_WORKFLOW,
+ workflow_id=SLEEP_WORKFLOW_ID,
+ )
+
+ create_execution_for_cancel = WorkflowsCreateExecutionOperator(
+ task_id="create_execution_for_cancel",
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ execution=EXECUTION,
+ workflow_id=SLEEP_WORKFLOW_ID,
+ )
+
+ # [START how_to_cancel_execution]
+ cancel_execution = WorkflowsCancelExecutionOperator(
+ task_id="cancel_execution",
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ workflow_id=SLEEP_WORKFLOW_ID,
+ execution_id='{{ task_instance.xcom_pull("create_execution_for_cancel", key="execution_id") }}',
+ )
+ # [END how_to_cancel_execution]
+
+ create_workflow >> update_workflows >> [get_workflow, list_workflows]
+ update_workflows >> [create_execution, create_execution_for_cancel]
+
+ create_execution >> wait_for_execution >> [get_execution, list_executions]
+ create_workflow_for_cancel >> create_execution_for_cancel >> cancel_execution
+
+ [cancel_execution, list_executions] >> delete_workflow
+
+
+if __name__ == '__main__':
+ dag.clear(dag_run_state=None)
+ dag.run()
diff --git a/airflow/providers/google/cloud/hooks/workflows.py b/airflow/providers/google/cloud/hooks/workflows.py
new file mode 100644
index 0000000..6c78350
--- /dev/null
+++ b/airflow/providers/google/cloud/hooks/workflows.py
@@ -0,0 +1,401 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Dict, Optional, Sequence, Tuple, Union
+
+from google.api_core.operation import Operation
+from google.api_core.retry import Retry
+
+# pylint: disable=no-name-in-module
+from google.cloud.workflows.executions_v1beta import Execution, ExecutionsClient
+from google.cloud.workflows.executions_v1beta.services.executions.pagers import ListExecutionsPager
+from google.cloud.workflows_v1beta import Workflow, WorkflowsClient
+from google.cloud.workflows_v1beta.services.workflows.pagers import ListWorkflowsPager
+from google.protobuf.field_mask_pb2 import FieldMask
+
+from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
+
+# pylint: enable=no-name-in-module
+
+
+class WorkflowsHook(GoogleBaseHook):
+ """
+ Hook for Google GCP APIs.
+
+ All the methods in the hook where project_id is used must be called with
+ keyword arguments rather than positional.
+ """
+
+ def get_workflows_client(self) -> WorkflowsClient:
+ """Returns WorkflowsClient."""
+ return WorkflowsClient(credentials=self._get_credentials(), client_info=self.client_info)
+
+ def get_executions_client(self) -> ExecutionsClient:
+ """Returns ExecutionsClient."""
+ return ExecutionsClient(credentials=self._get_credentials(), client_info=self.client_info)
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def create_workflow(
+ self,
+ workflow: Dict,
+ workflow_id: str,
+ location: str,
+ project_id: str,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ) -> Operation:
+ """
+ Creates a new workflow. If a workflow with the specified name
+ already exists in the specified project and location, the long
+ running operation will return
+ [ALREADY_EXISTS][google.rpc.Code.ALREADY_EXISTS] error.
+
+ :param workflow: Required. Workflow to be created.
+ :type workflow: Dict
+ :param workflow_id: Required. The ID of the workflow to be created.
+ :type workflow_id: str
+ :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+ :type project_id: str
+ :param location: Required. The GCP region in which to handle the request.
+ :type location: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ metadata = metadata or ()
+ client = self.get_workflows_client()
+ parent = f"projects/{project_id}/locations/{location}"
+ return client.create_workflow(
+ request={"parent": parent, "workflow": workflow, "workflow_id": workflow_id},
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def get_workflow(
+ self,
+ workflow_id: str,
+ location: str,
+ project_id: str,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ) -> Workflow:
+ """
+ Gets details of a single Workflow.
+
+ :param workflow_id: Required. The ID of the workflow to be created.
+ :type workflow_id: str
+ :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+ :type project_id: str
+ :param location: Required. The GCP region in which to handle the request.
+ :type location: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ metadata = metadata or ()
+ client = self.get_workflows_client()
+ name = f"projects/{project_id}/locations/{location}/workflows/{workflow_id}"
+ return client.get_workflow(request={"name": name}, retry=retry, timeout=timeout, metadata=metadata)
+
+ def update_workflow(
+ self,
+ workflow: Union[Dict, Workflow],
+ update_mask: Optional[FieldMask] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ) -> Operation:
+ """
+ Updates an existing workflow.
+ Running this method has no impact on already running
+ executions of the workflow. A new revision of the
+ workflow may be created as a result of a successful
+ update operation. In that case, such revision will be
+ used in new workflow executions.
+
+ :param workflow: Required. Workflow to be created.
+ :type workflow: Dict
+ :param update_mask: List of fields to be updated. If not present,
+ the entire workflow will be updated.
+ :type update_mask: FieldMask
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ metadata = metadata or ()
+ client = self.get_workflows_client()
+ return client.update_workflow(
+ request={"workflow": workflow, "update_mask": update_mask},
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def delete_workflow(
+ self,
+ workflow_id: str,
+ location: str,
+ project_id: str,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ) -> Operation:
+ """
+ Deletes a workflow with the specified name.
+ This method also cancels and deletes all running
+ executions of the workflow.
+
+ :param workflow_id: Required. The ID of the workflow to be created.
+ :type workflow_id: str
+ :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+ :type project_id: str
+ :param location: Required. The GCP region in which to handle the request.
+ :type location: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ metadata = metadata or ()
+ client = self.get_workflows_client()
+ name = f"projects/{project_id}/locations/{location}/workflows/{workflow_id}"
+ return client.delete_workflow(request={"name": name}, retry=retry, timeout=timeout, metadata=metadata)
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def list_workflows(
+ self,
+ location: str,
+ project_id: str,
+ filter_: Optional[str] = None,
+ order_by: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ) -> ListWorkflowsPager:
+ """
+ Lists Workflows in a given project and location.
+ The default order is not specified.
+
+ :param filter_: Filter to restrict results to specific workflows.
+ :type filter_: str
+ :param order_by: Comma-separated list of fields that that
+ specify the order of the results. Default sorting order for a field is ascending.
+ To specify descending order for a field, append a "desc" suffix.
+ If not specified, the results will be returned in an unspecified order.
+ :type order_by: str
+ :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+ :type project_id: str
+ :param location: Required. The GCP region in which to handle the request.
+ :type location: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ metadata = metadata or ()
+ client = self.get_workflows_client()
+ parent = f"projects/{project_id}/locations/{location}"
+
+ return client.list_workflows(
+ request={"parent": parent, "filter": filter_, "order_by": order_by},
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def create_execution(
+ self,
+ workflow_id: str,
+ location: str,
+ project_id: str,
+ execution: Dict,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ) -> Execution:
+ """
+ Creates a new execution using the latest revision of
+ the given workflow.
+
+ :param execution: Required. Input parameters of the execution represented as a dictionary.
+ :type execution: Dict
+ :param workflow_id: Required. The ID of the workflow.
+ :type workflow_id: str
+ :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+ :type project_id: str
+ :param location: Required. The GCP region in which to handle the request.
+ :type location: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ metadata = metadata or ()
+ client = self.get_executions_client()
+ parent = f"projects/{project_id}/locations/{location}/workflows/{workflow_id}"
+ return client.create_execution(
+ request={"parent": parent, "execution": execution},
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def get_execution(
+ self,
+ workflow_id: str,
+ execution_id: str,
+ location: str,
+ project_id: str,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ) -> Execution:
+ """
+ Returns an execution for the given ``workflow_id`` and ``execution_id``.
+
+ :param workflow_id: Required. The ID of the workflow.
+ :type workflow_id: str
+ :param execution_id: Required. The ID of the execution.
+ :type execution_id: str
+ :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+ :type project_id: str
+ :param location: Required. The GCP region in which to handle the request.
+ :type location: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ metadata = metadata or ()
+ client = self.get_executions_client()
+ name = f"projects/{project_id}/locations/{location}/workflows/{workflow_id}/executions/{execution_id}"
+ return client.get_execution(request={"name": name}, retry=retry, timeout=timeout, metadata=metadata)
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def cancel_execution(
+ self,
+ workflow_id: str,
+ execution_id: str,
+ location: str,
+ project_id: str,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ) -> Execution:
+ """
+ Cancels an execution using the given ``workflow_id`` and ``execution_id``.
+
+ :param workflow_id: Required. The ID of the workflow.
+ :type workflow_id: str
+ :param execution_id: Required. The ID of the execution.
+ :type execution_id: str
+ :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+ :type project_id: str
+ :param location: Required. The GCP region in which to handle the request.
+ :type location: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ metadata = metadata or ()
+ client = self.get_executions_client()
+ name = f"projects/{project_id}/locations/{location}/workflows/{workflow_id}/executions/{execution_id}"
+ return client.cancel_execution(
+ request={"name": name}, retry=retry, timeout=timeout, metadata=metadata
+ )
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def list_executions(
+ self,
+ workflow_id: str,
+ location: str,
+ project_id: str,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ) -> ListExecutionsPager:
+ """
+ Returns a list of executions which belong to the
+ workflow with the given name. The method returns
+ executions of all workflow revisions. Returned
+ executions are ordered by their start time (newest
+ first).
+
+ :param workflow_id: Required. The ID of the workflow to be created.
+ :type workflow_id: str
+ :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+ :type project_id: str
+ :param location: Required. The GCP region in which to handle the request.
+ :type location: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ metadata = metadata or ()
+ client = self.get_executions_client()
+ parent = f"projects/{project_id}/locations/{location}/workflows/{workflow_id}"
+ return client.list_executions(
+ request={"parent": parent}, retry=retry, timeout=timeout, metadata=metadata
+ )
diff --git a/airflow/providers/google/cloud/operators/workflows.py b/airflow/providers/google/cloud/operators/workflows.py
new file mode 100644
index 0000000..c7fc96d
--- /dev/null
+++ b/airflow/providers/google/cloud/operators/workflows.py
@@ -0,0 +1,714 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import hashlib
+import json
+import re
+import uuid
+from datetime import datetime, timedelta
+from typing import Dict, Optional, Sequence, Tuple, Union
+
+import pytz
+from google.api_core.exceptions import AlreadyExists
+from google.api_core.retry import Retry
+
+# pylint: disable=no-name-in-module
+from google.cloud.workflows.executions_v1beta import Execution
+from google.cloud.workflows_v1beta import Workflow
+
+# pylint: enable=no-name-in-module
+from google.protobuf.field_mask_pb2 import FieldMask
+
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.hooks.workflows import WorkflowsHook
+
+
+class WorkflowsCreateWorkflowOperator(BaseOperator):
+ """
+ Creates a new workflow. If a workflow with the specified name
+ already exists in the specified project and location, the long
+ running operation will return
+ [ALREADY_EXISTS][google.rpc.Code.ALREADY_EXISTS] error.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:WorkflowsCreateWorkflowOperator`
+
+ :param workflow: Required. Workflow to be created.
+ :type workflow: Dict
+ :param workflow_id: Required. The ID of the workflow to be created.
+ :type workflow_id: str
+ :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+ :type project_id: str
+ :param location: Required. The GCP region in which to handle the request.
+ :type location: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+
+ template_fields = ("location", "workflow", "workflow_id")
+ template_fields_renderers = {"workflow": "json"}
+
+ def __init__(
+ self,
+ *,
+ workflow: Dict,
+ workflow_id: str,
+ location: str,
+ project_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ force_rerun: bool = False,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+
+ self.workflow = workflow
+ self.workflow_id = workflow_id
+ self.location = location
+ self.project_id = project_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+ self.force_rerun = force_rerun
+
+ def _workflow_id(self, context):
+ if self.workflow_id and not self.force_rerun:
+ # If users provide workflow id then assuring the idempotency
+ # is on their side
+ return self.workflow_id
+
+ if self.force_rerun:
+ hash_base = str(uuid.uuid4())
+ else:
+ hash_base = json.dumps(self.workflow, sort_keys=True)
+
+ # We are limited by allowed length of workflow_id so
+ # we use hash of whole information
+ exec_date = context['execution_date'].isoformat()
+ base = f"airflow_{self.dag_id}_{self.task_id}_{exec_date}_{hash_base}"
+ workflow_id = hashlib.md5(base.encode()).hexdigest()
+ return re.sub(r"[:\-+.]", "_", workflow_id)
+
+ def execute(self, context):
+ hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+ workflow_id = self._workflow_id(context)
+
+ self.log.info("Creating workflow")
+ try:
+ operation = hook.create_workflow(
+ workflow=self.workflow,
+ workflow_id=workflow_id,
+ location=self.location,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ workflow = operation.result()
+ except AlreadyExists:
+ workflow = hook.get_workflow(
+ workflow_id=workflow_id,
+ location=self.location,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ return Workflow.to_dict(workflow)
+
+
+class WorkflowsUpdateWorkflowOperator(BaseOperator):
+ """
+ Updates an existing workflow.
+ Running this method has no impact on already running
+ executions of the workflow. A new revision of the
+ workflow may be created as a result of a successful
+ update operation. In that case, such revision will be
+ used in new workflow executions.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:WorkflowsUpdateWorkflowOperator`
+
+ :param workflow_id: Required. The ID of the workflow to be updated.
+ :type workflow_id: str
+ :param location: Required. The GCP region in which to handle the request.
+ :type location: str
+ :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+ :type project_id: str
+ :param update_mask: List of fields to be updated. If not present,
+ the entire workflow will be updated.
+ :type update_mask: FieldMask
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+
+ template_fields = ("workflow_id", "update_mask")
+ template_fields_renderers = {"update_mask": "json"}
+
+ def __init__(
+ self,
+ *,
+ workflow_id: str,
+ location: str,
+ project_id: Optional[str] = None,
+ update_mask: Optional[FieldMask] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+
+ self.workflow_id = workflow_id
+ self.location = location
+ self.project_id = project_id
+ self.update_mask = update_mask
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context):
+ hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+
+ workflow = hook.get_workflow(
+ workflow_id=self.workflow_id,
+ project_id=self.project_id,
+ location=self.location,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ self.log.info("Updating workflow")
+ operation = hook.update_workflow(
+ workflow=workflow,
+ update_mask=self.update_mask,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ workflow = operation.result()
+ return Workflow.to_dict(workflow)
+
+
+class WorkflowsDeleteWorkflowOperator(BaseOperator):
+ """
+ Deletes a workflow with the specified name.
+ This method also cancels and deletes all running
+ executions of the workflow.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:WorkflowsDeleteWorkflowOperator`
+
+ :param workflow_id: Required. The ID of the workflow to be created.
+ :type workflow_id: str
+ :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+ :type project_id: str
+ :param location: Required. The GCP region in which to handle the request.
+ :type location: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+
+ template_fields = ("location", "workflow_id")
+
+ def __init__(
+ self,
+ *,
+ workflow_id: str,
+ location: str,
+ project_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+
+ self.workflow_id = workflow_id
+ self.location = location
+ self.project_id = project_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context):
+ hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+ self.log.info("Deleting workflow %s", self.workflow_id)
+ operation = hook.delete_workflow(
+ workflow_id=self.workflow_id,
+ location=self.location,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ operation.result()
+
+
+class WorkflowsListWorkflowsOperator(BaseOperator):
+ """
+ Lists Workflows in a given project and location.
+ The default order is not specified.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:WorkflowsListWorkflowsOperator`
+
+ :param filter_: Filter to restrict results to specific workflows.
+ :type filter_: str
+ :param order_by: Comma-separated list of fields that that
+ specify the order of the results. Default sorting order for a field is ascending.
+ To specify descending order for a field, append a "desc" suffix.
+ If not specified, the results will be returned in an unspecified order.
+ :type order_by: str
+ :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+ :type project_id: str
+ :param location: Required. The GCP region in which to handle the request.
+ :type location: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+
+ template_fields = ("location", "order_by", "filter_")
+
+ def __init__(
+ self,
+ *,
+ location: str,
+ project_id: Optional[str] = None,
+ filter_: Optional[str] = None,
+ order_by: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+
+ self.filter_ = filter_
+ self.order_by = order_by
+ self.location = location
+ self.project_id = project_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context):
+ hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+ self.log.info("Retrieving workflows")
+ workflows_iter = hook.list_workflows(
+ filter_=self.filter_,
+ order_by=self.order_by,
+ location=self.location,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ return [Workflow.to_dict(w) for w in workflows_iter]
+
+
+class WorkflowsGetWorkflowOperator(BaseOperator):
+ """
+ Gets details of a single Workflow.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:WorkflowsGetWorkflowOperator`
+
+ :param workflow_id: Required. The ID of the workflow to be created.
+ :type workflow_id: str
+ :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+ :type project_id: str
+ :param location: Required. The GCP region in which to handle the request.
+ :type location: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+
+ template_fields = ("location", "workflow_id")
+
+ def __init__(
+ self,
+ *,
+ workflow_id: str,
+ location: str,
+ project_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+
+ self.workflow_id = workflow_id
+ self.location = location
+ self.project_id = project_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context):
+ hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+ self.log.info("Retrieving workflow")
+ workflow = hook.get_workflow(
+ workflow_id=self.workflow_id,
+ location=self.location,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ return Workflow.to_dict(workflow)
+
+
+class WorkflowsCreateExecutionOperator(BaseOperator):
+ """
+ Creates a new execution using the latest revision of
+ the given workflow.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:WorkflowsCreateExecutionOperator`
+
+ :param execution: Required. Execution to be created.
+ :type execution: Dict
+ :param workflow_id: Required. The ID of the workflow.
+ :type workflow_id: str
+ :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+ :type project_id: str
+ :param location: Required. The GCP region in which to handle the request.
+ :type location: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+
+ template_fields = ("location", "workflow_id", "execution")
+ template_fields_renderers = {"execution": "json"}
+
+ def __init__(
+ self,
+ *,
+ workflow_id: str,
+ execution: Dict,
+ location: str,
+ project_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+
+ self.workflow_id = workflow_id
+ self.execution = execution
+ self.location = location
+ self.project_id = project_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context):
+ hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+ self.log.info("Creating execution")
+ execution = hook.create_execution(
+ workflow_id=self.workflow_id,
+ execution=self.execution,
+ location=self.location,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ execution_id = execution.name.split("/")[-1]
+ self.xcom_push(context, key="execution_id", value=execution_id)
+ return Execution.to_dict(execution)
+
+
+class WorkflowsCancelExecutionOperator(BaseOperator):
+ """
+ Cancels an execution using the given ``workflow_id`` and ``execution_id``.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:WorkflowsCancelExecutionOperator`
+
+ :param workflow_id: Required. The ID of the workflow.
+ :type workflow_id: str
+ :param execution_id: Required. The ID of the execution.
+ :type execution_id: str
+ :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+ :type project_id: str
+ :param location: Required. The GCP region in which to handle the request.
+ :type location: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+
+ template_fields = ("location", "workflow_id", "execution_id")
+
+ def __init__(
+ self,
+ *,
+ workflow_id: str,
+ execution_id: str,
+ location: str,
+ project_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+
+ self.workflow_id = workflow_id
+ self.execution_id = execution_id
+ self.location = location
+ self.project_id = project_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context):
+ hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+ self.log.info("Canceling execution %s", self.execution_id)
+ execution = hook.cancel_execution(
+ workflow_id=self.workflow_id,
+ execution_id=self.execution_id,
+ location=self.location,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ return Execution.to_dict(execution)
+
+
+class WorkflowsListExecutionsOperator(BaseOperator):
+ """
+ Returns a list of executions which belong to the
+ workflow with the given name. The method returns
+ executions of all workflow revisions. Returned
+ executions are ordered by their start time (newest
+ first).
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:WorkflowsListExecutionsOperator`
+
+ :param workflow_id: Required. The ID of the workflow to be created.
+ :type workflow_id: str
+ :param start_date_filter: If passed only executions older that this date will be returned.
+ By default operators return executions from last 60 minutes
+ :type start_date_filter: datetime
+ :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+ :type project_id: str
+ :param location: Required. The GCP region in which to handle the request.
+ :type location: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+
+ template_fields = ("location", "workflow_id")
+
+ def __init__(
+ self,
+ *,
+ workflow_id: str,
+ location: str,
+ start_date_filter: Optional[datetime] = None,
+ project_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+
+ self.workflow_id = workflow_id
+ self.location = location
+ self.start_date_filter = start_date_filter or datetime.now(tz=pytz.UTC) - timedelta(minutes=60)
+ self.project_id = project_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context):
+ hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+ self.log.info("Retrieving executions for workflow %s", self.workflow_id)
+ execution_iter = hook.list_executions(
+ workflow_id=self.workflow_id,
+ location=self.location,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+
+ return [Execution.to_dict(e) for e in execution_iter if e.start_time > self.start_date_filter]
+
+
+class WorkflowsGetExecutionOperator(BaseOperator):
+ """
+ Returns an execution for the given ``workflow_id`` and ``execution_id``.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:WorkflowsGetExecutionOperator`
+
+ :param workflow_id: Required. The ID of the workflow.
+ :type workflow_id: str
+ :param execution_id: Required. The ID of the execution.
+ :type execution_id: str
+ :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+ :type project_id: str
+ :param location: Required. The GCP region in which to handle the request.
+ :type location: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+
+ template_fields = ("location", "workflow_id", "execution_id")
+
+ def __init__(
+ self,
+ *,
+ workflow_id: str,
+ execution_id: str,
+ location: str,
+ project_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+
+ self.workflow_id = workflow_id
+ self.execution_id = execution_id
+ self.location = location
+ self.project_id = project_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context):
+ hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+ self.log.info("Retrieving execution %s for workflow %s", self.execution_id, self.workflow_id)
+ execution = hook.get_execution(
+ workflow_id=self.workflow_id,
+ execution_id=self.execution_id,
+ location=self.location,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ return Execution.to_dict(execution)
diff --git a/airflow/providers/google/cloud/sensors/workflows.py b/airflow/providers/google/cloud/sensors/workflows.py
new file mode 100644
index 0000000..5950458
--- /dev/null
+++ b/airflow/providers/google/cloud/sensors/workflows.py
@@ -0,0 +1,123 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Optional, Sequence, Set, Tuple, Union
+
+from google.api_core.retry import Retry
+from google.cloud.workflows.executions_v1beta import Execution
+
+from airflow.exceptions import AirflowException
+from airflow.providers.google.cloud.hooks.workflows import WorkflowsHook
+from airflow.sensors.base import BaseSensorOperator
+
+
+class WorkflowExecutionSensor(BaseSensorOperator):
+ """
+ Checks state of an execution for the given ``workflow_id`` and ``execution_id``.
+
+ :param workflow_id: Required. The ID of the workflow.
+ :type workflow_id: str
+ :param execution_id: Required. The ID of the execution.
+ :type execution_id: str
+ :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+ :type project_id: str
+ :param location: Required. The Cloud Dataproc region in which to handle the request.
+ :type location: str
+ :param success_states: Execution states to be considered as successful, by default
+ it's only ``SUCCEEDED`` state
+ :type success_states: List[Execution.State]
+ :param failure_states: Execution states to be considered as failures, by default
+ they are ``FAILED`` and ``CANCELLED`` states.
+ :type failure_states: List[Execution.State]
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param request_timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type request_timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+
+ template_fields = ("location", "workflow_id", "execution_id")
+
+ def __init__(
+ self,
+ *,
+ workflow_id: str,
+ execution_id: str,
+ location: str,
+ project_id: str,
+ success_states: Optional[Set[Execution.State]] = None,
+ failure_states: Optional[Set[Execution.State]] = None,
+ retry: Optional[Retry] = None,
+ request_timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+
+ self.success_states = success_states or {Execution.State.SUCCEEDED}
+ self.failure_states = failure_states or {Execution.State.FAILED, Execution.State.CANCELLED}
+ self.workflow_id = workflow_id
+ self.execution_id = execution_id
+ self.location = location
+ self.project_id = project_id
+ self.retry = retry
+ self.request_timeout = request_timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def poke(self, context):
+ hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+ self.log.info("Checking state of execution %s for workflow %s", self.execution_id, self.workflow_id)
+ execution: Execution = hook.get_execution(
+ workflow_id=self.workflow_id,
+ execution_id=self.execution_id,
+ location=self.location,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.request_timeout,
+ metadata=self.metadata,
+ )
+
+ state = execution.state
+ if state in self.failure_states:
+ raise AirflowException(
+ f"Execution {self.execution_id} for workflow {self.execution_id} "
+ f"failed and is in `{state}` state",
+ )
+
+ if state in self.success_states:
+ self.log.info(
+ "Execution %s for workflow %s completed with state: %s",
+ self.execution_id,
+ self.workflow_id,
+ state,
+ )
+ return True
+
+ self.log.info(
+ "Execution %s for workflow %s does not completed yet, current state: %s",
+ self.execution_id,
+ self.workflow_id,
+ state,
+ )
+ return False
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index 67b7af4..39ba434 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -301,6 +301,11 @@ integrations:
- /docs/apache-airflow-providers-google/operators/cloud/natural_language.rst
logo: /integration-logos/gcp/Cloud-NLP.png
tags: [gcp]
+ - integration-name: Google Cloud Workflows
+ external-doc-url: https://cloud.google.com/workflows/
+ how-to-guide:
+ - /docs/apache-airflow-providers-google/operators/cloud/workflows.rst
+ tags: [gcp]
operators:
- integration-name: Google Ads
@@ -401,6 +406,9 @@ operators:
- integration-name: Google Cloud Vision
python-modules:
- airflow.providers.google.cloud.operators.vision
+ - integration-name: Google Cloud Workflows
+ python-modules:
+ - airflow.providers.google.cloud.operators.workflows
- integration-name: Google Cloud Firestore
python-modules:
- airflow.providers.google.firebase.operators.firestore
@@ -445,6 +453,9 @@ sensors:
- integration-name: Google Cloud Pub/Sub
python-modules:
- airflow.providers.google.cloud.sensors.pubsub
+ - integration-name: Google Cloud Workflows
+ python-modules:
+ - airflow.providers.google.cloud.sensors.workflows
- integration-name: Google Campaign Manager
python-modules:
- airflow.providers.google.marketing_platform.sensors.campaign_manager
@@ -565,6 +576,9 @@ hooks:
- integration-name: Google Cloud Vision
python-modules:
- airflow.providers.google.cloud.hooks.vision
+ - integration-name: Google Cloud Workflows
+ python-modules:
+ - airflow.providers.google.cloud.hooks.workflows
- integration-name: Google
python-modules:
- airflow.providers.google.common.hooks.base_google
diff --git a/docs/apache-airflow-providers-google/operators/cloud/workflows.rst b/docs/apache-airflow-providers-google/operators/cloud/workflows.rst
new file mode 100644
index 0000000..551a7ca
--- /dev/null
+++ b/docs/apache-airflow-providers-google/operators/cloud/workflows.rst
@@ -0,0 +1,185 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+Google Cloud Workflows Operators
+================================
+
+You can use Workflows to create serverless workflows that link series of serverless tasks together
+in an order you define. Combine the power of Google Cloud's APIs, serverless products like Cloud
+Functions and Cloud Run, and calls to external APIs to create flexible serverless applications.
+
+For more information about the service visit
+`Workflows production documentation <Product documentation <https://cloud.google.com/workflows/docs/overview>`__.
+
+.. contents::
+ :depth: 1
+ :local:
+
+Prerequisite Tasks
+------------------
+
+.. include::/operators/_partials/prerequisite_tasks.rst
+
+
+.. _howto/operator:WorkflowsCreateWorkflowOperator:
+
+Create workflow
+===============
+
+To create a workflow use
+:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsCreateWorkflowOperator`.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_create_workflow]
+ :end-before: [END how_to_create_workflow]
+
+The workflow should be define in similar why to this example:
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+ :language: python
+ :dedent: 0
+ :start-after: [START how_to_define_workflow]
+ :end-before: [END how_to_define_workflow]
+
+For more information about authoring workflows check official
+production documentation `<Product documentation <https://cloud.google.com/workflows/docs/overview>`__.
+
+
+.. _howto/operator:WorkflowsUpdateWorkflowOperator:
+
+Update workflow
+===============
+
+To update a workflow use
+:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsUpdateWorkflowOperator`.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_update_workflow]
+ :end-before: [END how_to_update_workflow]
+
+.. _howto/operator:WorkflowsGetWorkflowOperator:
+
+Get workflow
+============
+
+To get a workflow use
+:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsGetWorkflowOperator`.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_get_workflow]
+ :end-before: [END how_to_get_workflow]
+
+.. _howto/operator:WorkflowsListWorkflowsOperator:
+
+List workflows
+==============
+
+To list workflows use
+:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsListWorkflowsOperator`.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_list_workflows]
+ :end-before: [END how_to_list_workflows]
+
+.. _howto/operator:WorkflowsDeleteWorkflowOperator:
+
+Delete workflow
+===============
+
+To delete a workflow use
+:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsDeleteWorkflowOperator`.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_delete_workflow]
+ :end-before: [END how_to_delete_workflow]
+
+.. _howto/operator:WorkflowsCreateExecutionOperator:
+
+Create execution
+================
+
+To create an execution use
+:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsCreateExecutionOperator`.
+This operator is not idempotent due to API limitation.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_create_execution]
+ :end-before: [END how_to_create_execution]
+
+The create operator does not wait for execution to complete. To wait for execution result use
+:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowExecutionSensor`.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_wait_for_execution]
+ :end-before: [END how_to_wait_for_execution]
+
+.. _howto/operator:WorkflowsGetExecutionOperator:
+
+Get execution
+================
+
+To get an execution use
+:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsGetExecutionOperator`.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_get_execution]
+ :end-before: [END how_to_get_execution]
+
+.. _howto/operator:WorkflowsListExecutionsOperator:
+
+List executions
+===============
+
+To list executions use
+:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsListExecutionsOperator`.
+By default this operator will return only executions for last 60 minutes.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_list_executions]
+ :end-before: [END how_to_list_executions]
+
+.. _howto/operator:WorkflowsCancelExecutionOperator:
+
+Cancel execution
+================
+
+To cancel an execution use
+:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsCancelExecutionOperator`.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_cancel_execution]
+ :end-before: [END how_to_cancel_execution]
diff --git a/setup.py b/setup.py
index 6e2a2f7..67e6a85 100644
--- a/setup.py
+++ b/setup.py
@@ -278,6 +278,7 @@ flask_oauth = [
google = [
'PyOpenSSL',
'google-ads>=4.0.0,<8.0.0',
+ 'google-api-core>=1.25.1,<2.0.0',
'google-api-python-client>=1.6.0,<2.0.0',
'google-auth>=1.0.0,<2.0.0',
'google-auth-httplib2>=0.0.1',
@@ -305,6 +306,7 @@ google = [
'google-cloud-translate>=1.5.0,<2.0.0',
'google-cloud-videointelligence>=1.7.0,<2.0.0',
'google-cloud-vision>=0.35.2,<2.0.0',
+ 'google-cloud-workflows>=0.1.0,<2.0.0',
'grpcio-gcp>=0.2.2',
'json-merge-patch~=0.2',
'pandas-gbq',
diff --git a/tests/providers/google/cloud/hooks/test_workflows.py b/tests/providers/google/cloud/hooks/test_workflows.py
new file mode 100644
index 0000000..4f3d4d0
--- /dev/null
+++ b/tests/providers/google/cloud/hooks/test_workflows.py
@@ -0,0 +1,256 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import mock
+
+from airflow.providers.google.cloud.hooks.workflows import WorkflowsHook
+
+BASE_PATH = "airflow.providers.google.cloud.hooks.workflows.{}"
+LOCATION = "europe-west1"
+WORKFLOW_ID = "workflow_id"
+EXECUTION_ID = "execution_id"
+WORKFLOW = {"aa": "bb"}
+EXECUTION = {"ccc": "ddd"}
+PROJECT_ID = "airflow-testing"
+METADATA = ()
+TIMEOUT = None
+RETRY = None
+FILTER_ = "aaaa"
+ORDER_BY = "bbb"
+UPDATE_MASK = "aaa,bbb"
+
+WORKFLOW_PARENT = f"projects/{PROJECT_ID}/locations/{LOCATION}"
+WORKFLOW_NAME = f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{WORKFLOW_ID}"
+EXECUTION_PARENT = f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{WORKFLOW_ID}"
+EXECUTION_NAME = (
+ f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{WORKFLOW_ID}/executions/{EXECUTION_ID}"
+)
+
+
+def mock_init(*args, **kwargs):
+ pass
+
+
+class TestWorkflowsHook:
+ def setup_method(self, _):
+ with mock.patch(BASE_PATH.format("GoogleBaseHook.__init__"), new=mock_init):
+ self.hook = WorkflowsHook(gcp_conn_id="test") # pylint: disable=attribute-defined-outside-init
+
+ @mock.patch(BASE_PATH.format("WorkflowsHook._get_credentials"))
+ @mock.patch(BASE_PATH.format("WorkflowsHook.client_info"), new_callable=mock.PropertyMock)
+ @mock.patch(BASE_PATH.format("WorkflowsClient"))
+ def test_get_workflows_client(self, mock_client, mock_client_info, mock_get_credentials):
+ self.hook.get_workflows_client()
+ mock_client.assert_called_once_with(
+ credentials=mock_get_credentials.return_value,
+ client_info=mock_client_info.return_value,
+ )
+
+ @mock.patch(BASE_PATH.format("WorkflowsHook._get_credentials"))
+ @mock.patch(BASE_PATH.format("WorkflowsHook.client_info"), new_callable=mock.PropertyMock)
+ @mock.patch(BASE_PATH.format("ExecutionsClient"))
+ def test_get_executions_client(self, mock_client, mock_client_info, mock_get_credentials):
+ self.hook.get_executions_client()
+ mock_client.assert_called_once_with(
+ credentials=mock_get_credentials.return_value,
+ client_info=mock_client_info.return_value,
+ )
+
+ @mock.patch(BASE_PATH.format("WorkflowsHook.get_workflows_client"))
+ def test_create_workflow(self, mock_client):
+ result = self.hook.create_workflow(
+ workflow=WORKFLOW,
+ workflow_id=WORKFLOW_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ assert mock_client.return_value.create_workflow.return_value == result
+ mock_client.return_value.create_workflow.assert_called_once_with(
+ request=dict(workflow=WORKFLOW, workflow_id=WORKFLOW_ID, parent=WORKFLOW_PARENT),
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ @mock.patch(BASE_PATH.format("WorkflowsHook.get_workflows_client"))
+ def test_get_workflow(self, mock_client):
+ result = self.hook.get_workflow(
+ workflow_id=WORKFLOW_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ assert mock_client.return_value.get_workflow.return_value == result
+ mock_client.return_value.get_workflow.assert_called_once_with(
+ request=dict(name=WORKFLOW_NAME),
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ @mock.patch(BASE_PATH.format("WorkflowsHook.get_workflows_client"))
+ def test_update_workflow(self, mock_client):
+ result = self.hook.update_workflow(
+ workflow=WORKFLOW,
+ update_mask=UPDATE_MASK,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ assert mock_client.return_value.update_workflow.return_value == result
+ mock_client.return_value.update_workflow.assert_called_once_with(
+ request=dict(
+ workflow=WORKFLOW,
+ update_mask=UPDATE_MASK,
+ ),
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ @mock.patch(BASE_PATH.format("WorkflowsHook.get_workflows_client"))
+ def test_delete_workflow(self, mock_client):
+ result = self.hook.delete_workflow(
+ workflow_id=WORKFLOW_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ assert mock_client.return_value.delete_workflow.return_value == result
+ mock_client.return_value.delete_workflow.assert_called_once_with(
+ request=dict(name=WORKFLOW_NAME),
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ @mock.patch(BASE_PATH.format("WorkflowsHook.get_workflows_client"))
+ def test_list_workflows(self, mock_client):
+ result = self.hook.list_workflows(
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ filter_=FILTER_,
+ order_by=ORDER_BY,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ assert mock_client.return_value.list_workflows.return_value == result
+ mock_client.return_value.list_workflows.assert_called_once_with(
+ request=dict(
+ parent=WORKFLOW_PARENT,
+ filter=FILTER_,
+ order_by=ORDER_BY,
+ ),
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ @mock.patch(BASE_PATH.format("WorkflowsHook.get_executions_client"))
+ def test_create_execution(self, mock_client):
+ result = self.hook.create_execution(
+ workflow_id=WORKFLOW_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ execution=EXECUTION,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ assert mock_client.return_value.create_execution.return_value == result
+ mock_client.return_value.create_execution.assert_called_once_with(
+ request=dict(
+ parent=EXECUTION_PARENT,
+ execution=EXECUTION,
+ ),
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ @mock.patch(BASE_PATH.format("WorkflowsHook.get_executions_client"))
+ def test_get_execution(self, mock_client):
+ result = self.hook.get_execution(
+ workflow_id=WORKFLOW_ID,
+ execution_id=EXECUTION_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ assert mock_client.return_value.get_execution.return_value == result
+ mock_client.return_value.get_execution.assert_called_once_with(
+ request=dict(name=EXECUTION_NAME),
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ @mock.patch(BASE_PATH.format("WorkflowsHook.get_executions_client"))
+ def test_cancel_execution(self, mock_client):
+ result = self.hook.cancel_execution(
+ workflow_id=WORKFLOW_ID,
+ execution_id=EXECUTION_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ assert mock_client.return_value.cancel_execution.return_value == result
+ mock_client.return_value.cancel_execution.assert_called_once_with(
+ request=dict(name=EXECUTION_NAME),
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ @mock.patch(BASE_PATH.format("WorkflowsHook.get_executions_client"))
+ def test_list_execution(self, mock_client):
+ result = self.hook.list_executions(
+ workflow_id=WORKFLOW_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ assert mock_client.return_value.list_executions.return_value == result
+ mock_client.return_value.list_executions.assert_called_once_with(
+ request=dict(parent=EXECUTION_PARENT),
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
diff --git a/tests/providers/google/cloud/operators/test_workflows.py b/tests/providers/google/cloud/operators/test_workflows.py
new file mode 100644
index 0000000..5578548
--- /dev/null
+++ b/tests/providers/google/cloud/operators/test_workflows.py
@@ -0,0 +1,383 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import datetime
+from unittest import mock
+
+import pytz
+
+from airflow.providers.google.cloud.operators.workflows import (
+ WorkflowsCancelExecutionOperator,
+ WorkflowsCreateExecutionOperator,
+ WorkflowsCreateWorkflowOperator,
+ WorkflowsDeleteWorkflowOperator,
+ WorkflowsGetExecutionOperator,
+ WorkflowsGetWorkflowOperator,
+ WorkflowsListExecutionsOperator,
+ WorkflowsListWorkflowsOperator,
+ WorkflowsUpdateWorkflowOperator,
+)
+
+BASE_PATH = "airflow.providers.google.cloud.operators.workflows.{}"
+LOCATION = "europe-west1"
+WORKFLOW_ID = "workflow_id"
+EXECUTION_ID = "execution_id"
+WORKFLOW = {"aa": "bb"}
+EXECUTION = {"ccc": "ddd"}
+PROJECT_ID = "airflow-testing"
+METADATA = None
+TIMEOUT = None
+RETRY = None
+FILTER_ = "aaaa"
+ORDER_BY = "bbb"
+UPDATE_MASK = "aaa,bbb"
+GCP_CONN_ID = "test-conn"
+IMPERSONATION_CHAIN = None
+
+
+class TestWorkflowsCreateWorkflowOperator:
+ @mock.patch(BASE_PATH.format("Workflow"))
+ @mock.patch(BASE_PATH.format("WorkflowsHook"))
+ def test_execute(self, mock_hook, mock_object):
+ op = WorkflowsCreateWorkflowOperator(
+ task_id="test_task",
+ workflow=WORKFLOW,
+ workflow_id=WORKFLOW_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ result = op.execute({})
+
+ mock_hook.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+
+ mock_hook.return_value.create_workflow.assert_called_once_with(
+ workflow=WORKFLOW,
+ workflow_id=WORKFLOW_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ assert result == mock_object.to_dict.return_value
+
+
+class TestWorkflowsUpdateWorkflowOperator:
+ @mock.patch(BASE_PATH.format("Workflow"))
+ @mock.patch(BASE_PATH.format("WorkflowsHook"))
+ def test_execute(self, mock_hook, mock_object):
+ op = WorkflowsUpdateWorkflowOperator(
+ task_id="test_task",
+ workflow_id=WORKFLOW_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ update_mask=UPDATE_MASK,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ result = op.execute({})
+
+ mock_hook.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+
+ mock_hook.return_value.get_workflow.assert_called_once_with(
+ workflow_id=WORKFLOW_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ mock_hook.return_value.update_workflow.assert_called_once_with(
+ workflow=mock_hook.return_value.get_workflow.return_value,
+ update_mask=UPDATE_MASK,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ assert result == mock_object.to_dict.return_value
+
+
+class TestWorkflowsDeleteWorkflowOperator:
+ @mock.patch(BASE_PATH.format("WorkflowsHook"))
+ def test_execute(
+ self,
+ mock_hook,
+ ):
+ op = WorkflowsDeleteWorkflowOperator(
+ task_id="test_task",
+ workflow_id=WORKFLOW_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ op.execute({})
+
+ mock_hook.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+
+ mock_hook.return_value.delete_workflow.assert_called_once_with(
+ workflow_id=WORKFLOW_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+
+class TestWorkflowsListWorkflowsOperator:
+ @mock.patch(BASE_PATH.format("Workflow"))
+ @mock.patch(BASE_PATH.format("WorkflowsHook"))
+ def test_execute(self, mock_hook, mock_object):
+ workflow_mock = mock.MagicMock()
+ workflow_mock.start_time = datetime.datetime.now(tz=pytz.UTC) + datetime.timedelta(minutes=5)
+ mock_hook.return_value.list_workflows.return_value = [workflow_mock]
+
+ op = WorkflowsListWorkflowsOperator(
+ task_id="test_task",
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ filter_=FILTER_,
+ order_by=ORDER_BY,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ result = op.execute({})
+
+ mock_hook.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+
+ mock_hook.return_value.list_workflows.assert_called_once_with(
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ filter_=FILTER_,
+ order_by=ORDER_BY,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ assert result == [mock_object.to_dict.return_value]
+
+
+class TestWorkflowsGetWorkflowOperator:
+ @mock.patch(BASE_PATH.format("Workflow"))
+ @mock.patch(BASE_PATH.format("WorkflowsHook"))
+ def test_execute(self, mock_hook, mock_object):
+ op = WorkflowsGetWorkflowOperator(
+ task_id="test_task",
+ workflow_id=WORKFLOW_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ result = op.execute({})
+
+ mock_hook.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+
+ mock_hook.return_value.get_workflow.assert_called_once_with(
+ workflow_id=WORKFLOW_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ assert result == mock_object.to_dict.return_value
+
+
+class TestWorkflowExecutionsCreateExecutionOperator:
+ @mock.patch(BASE_PATH.format("Execution"))
+ @mock.patch(BASE_PATH.format("WorkflowsHook"))
+ @mock.patch(BASE_PATH.format("WorkflowsCreateExecutionOperator.xcom_push"))
+ def test_execute(self, mock_xcom, mock_hook, mock_object):
+ mock_hook.return_value.create_execution.return_value.name = "name/execution_id"
+ op = WorkflowsCreateExecutionOperator(
+ task_id="test_task",
+ workflow_id=WORKFLOW_ID,
+ execution=EXECUTION,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ result = op.execute({})
+
+ mock_hook.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+
+ mock_hook.return_value.create_execution.assert_called_once_with(
+ workflow_id=WORKFLOW_ID,
+ execution=EXECUTION,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+ mock_xcom.assert_called_once_with({}, key="execution_id", value="execution_id")
+ assert result == mock_object.to_dict.return_value
+
+
+class TestWorkflowExecutionsCancelExecutionOperator:
+ @mock.patch(BASE_PATH.format("Execution"))
+ @mock.patch(BASE_PATH.format("WorkflowsHook"))
+ def test_execute(self, mock_hook, mock_object):
+ op = WorkflowsCancelExecutionOperator(
+ task_id="test_task",
+ workflow_id=WORKFLOW_ID,
+ execution_id=EXECUTION_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ result = op.execute({})
+
+ mock_hook.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+
+ mock_hook.return_value.cancel_execution.assert_called_once_with(
+ workflow_id=WORKFLOW_ID,
+ execution_id=EXECUTION_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ assert result == mock_object.to_dict.return_value
+
+
+class TestWorkflowExecutionsListExecutionsOperator:
+ @mock.patch(BASE_PATH.format("Execution"))
+ @mock.patch(BASE_PATH.format("WorkflowsHook"))
+ def test_execute(self, mock_hook, mock_object):
+ execution_mock = mock.MagicMock()
+ execution_mock.start_time = datetime.datetime.now(tz=pytz.UTC) + datetime.timedelta(minutes=5)
+ mock_hook.return_value.list_executions.return_value = [execution_mock]
+
+ op = WorkflowsListExecutionsOperator(
+ task_id="test_task",
+ workflow_id=WORKFLOW_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ result = op.execute({})
+
+ mock_hook.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+
+ mock_hook.return_value.list_executions.assert_called_once_with(
+ workflow_id=WORKFLOW_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ assert result == [mock_object.to_dict.return_value]
+
+
+class TestWorkflowExecutionsGetExecutionOperator:
+ @mock.patch(BASE_PATH.format("Execution"))
+ @mock.patch(BASE_PATH.format("WorkflowsHook"))
+ def test_execute(self, mock_hook, mock_object):
+ op = WorkflowsGetExecutionOperator(
+ task_id="test_task",
+ workflow_id=WORKFLOW_ID,
+ execution_id=EXECUTION_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ result = op.execute({})
+
+ mock_hook.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+
+ mock_hook.return_value.get_execution.assert_called_once_with(
+ workflow_id=WORKFLOW_ID,
+ execution_id=EXECUTION_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ assert result == mock_object.to_dict.return_value
diff --git a/tests/providers/google/cloud/operators/test_workflows_system.py b/tests/providers/google/cloud/operators/test_workflows_system.py
new file mode 100644
index 0000000..0a768ed
--- /dev/null
+++ b/tests/providers/google/cloud/operators/test_workflows_system.py
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest
+
+from tests.providers.google.cloud.utils.gcp_authenticator import GCP_WORKFLOWS_KEY
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
+
+
+@pytest.mark.system("google.cloud")
+@pytest.mark.credential_file(GCP_WORKFLOWS_KEY)
+class CloudVisionExampleDagsSystemTest(GoogleSystemTest):
+ @provide_gcp_context(GCP_WORKFLOWS_KEY)
+ def test_run_example_workflow_dag(self):
+ self.run_dag('example_cloud_workflows', CLOUD_DAG_FOLDER)
diff --git a/tests/providers/google/cloud/sensors/test_workflows.py b/tests/providers/google/cloud/sensors/test_workflows.py
new file mode 100644
index 0000000..56ad958
--- /dev/null
+++ b/tests/providers/google/cloud/sensors/test_workflows.py
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import mock
+
+import pytest
+from google.cloud.workflows.executions_v1beta import Execution
+
+from airflow.exceptions import AirflowException
+from airflow.providers.google.cloud.sensors.workflows import WorkflowExecutionSensor
+
+BASE_PATH = "airflow.providers.google.cloud.sensors.workflows.{}"
+LOCATION = "europe-west1"
+WORKFLOW_ID = "workflow_id"
+EXECUTION_ID = "execution_id"
+PROJECT_ID = "airflow-testing"
+METADATA = None
+TIMEOUT = None
+RETRY = None
+GCP_CONN_ID = "test-conn"
+IMPERSONATION_CHAIN = None
+
+
+class TestWorkflowExecutionSensor:
+ @mock.patch(BASE_PATH.format("WorkflowsHook"))
+ def test_poke_success(self, mock_hook):
+ mock_hook.return_value.get_execution.return_value = mock.MagicMock(state=Execution.State.SUCCEEDED)
+ op = WorkflowExecutionSensor(
+ task_id="test_task",
+ workflow_id=WORKFLOW_ID,
+ execution_id=EXECUTION_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ request_timeout=TIMEOUT,
+ metadata=METADATA,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ result = op.poke({})
+
+ mock_hook.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+
+ mock_hook.return_value.get_execution.assert_called_once_with(
+ workflow_id=WORKFLOW_ID,
+ execution_id=EXECUTION_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ )
+
+ assert result is True
+
+ @mock.patch(BASE_PATH.format("WorkflowsHook"))
+ def test_poke_wait(self, mock_hook):
+ mock_hook.return_value.get_execution.return_value = mock.MagicMock(state=Execution.State.ACTIVE)
+ op = WorkflowExecutionSensor(
+ task_id="test_task",
+ workflow_id=WORKFLOW_ID,
+ execution_id=EXECUTION_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ request_timeout=TIMEOUT,
+ metadata=METADATA,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ result = op.poke({})
+
+ assert result is False
+
+ @mock.patch(BASE_PATH.format("WorkflowsHook"))
+ def test_poke_failure(self, mock_hook):
+ mock_hook.return_value.get_execution.return_value = mock.MagicMock(state=Execution.State.FAILED)
+ op = WorkflowExecutionSensor(
+ task_id="test_task",
+ workflow_id=WORKFLOW_ID,
+ execution_id=EXECUTION_ID,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ retry=RETRY,
+ request_timeout=TIMEOUT,
+ metadata=METADATA,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ with pytest.raises(AirflowException):
+ op.poke({})
diff --git a/tests/providers/google/cloud/utils/gcp_authenticator.py b/tests/providers/google/cloud/utils/gcp_authenticator.py
index 8aa55b8..8cf222e 100644
--- a/tests/providers/google/cloud/utils/gcp_authenticator.py
+++ b/tests/providers/google/cloud/utils/gcp_authenticator.py
@@ -54,6 +54,7 @@ GCP_SECRET_MANAGER_KEY = 'gcp_secret_manager.json'
GCP_SPANNER_KEY = 'gcp_spanner.json'
GCP_STACKDRIVER = 'gcp_stackdriver.json'
GCP_TASKS_KEY = 'gcp_tasks.json'
+GCP_WORKFLOWS_KEY = "gcp_workflows.json"
GMP_KEY = 'gmp.json'
G_FIREBASE_KEY = 'g_firebase.json'
GCP_AWS_KEY = 'gcp_aws.json'