You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by tu...@apache.org on 2021/01/28 19:35:20 UTC

[airflow] branch master updated: Add Google Cloud Workflows Operators (#13366)

This is an automated email from the ASF dual-hosted git repository.

turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d6588f  Add Google Cloud Workflows Operators (#13366)
6d6588f is described below

commit 6d6588fe2b8bb5fa33e930646d963df3e0530f23
Author: Tomek Urbaszek <tu...@gmail.com>
AuthorDate: Thu Jan 28 20:35:09 2021 +0100

    Add Google Cloud Workflows Operators (#13366)
    
    Add Google Cloud Workflows Operators, system test, example and sensor
    
    Co-authored-by: Tobiasz Kędzierski <to...@polidea.com>
---
 .../google/cloud/example_dags/example_workflows.py | 197 ++++++
 airflow/providers/google/cloud/hooks/workflows.py  | 401 ++++++++++++
 .../providers/google/cloud/operators/workflows.py  | 714 +++++++++++++++++++++
 .../providers/google/cloud/sensors/workflows.py    | 123 ++++
 airflow/providers/google/provider.yaml             |  14 +
 .../operators/cloud/workflows.rst                  | 185 ++++++
 setup.py                                           |   2 +
 .../providers/google/cloud/hooks/test_workflows.py | 256 ++++++++
 .../google/cloud/operators/test_workflows.py       | 383 +++++++++++
 .../cloud/operators/test_workflows_system.py       |  29 +
 .../google/cloud/sensors/test_workflows.py         | 108 ++++
 .../google/cloud/utils/gcp_authenticator.py        |   1 +
 12 files changed, 2413 insertions(+)

diff --git a/airflow/providers/google/cloud/example_dags/example_workflows.py b/airflow/providers/google/cloud/example_dags/example_workflows.py
new file mode 100644
index 0000000..0fab435
--- /dev/null
+++ b/airflow/providers/google/cloud/example_dags/example_workflows.py
@@ -0,0 +1,197 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+
+from airflow import DAG
+from airflow.providers.google.cloud.operators.workflows import (
+    WorkflowsCancelExecutionOperator,
+    WorkflowsCreateExecutionOperator,
+    WorkflowsCreateWorkflowOperator,
+    WorkflowsDeleteWorkflowOperator,
+    WorkflowsGetExecutionOperator,
+    WorkflowsGetWorkflowOperator,
+    WorkflowsListExecutionsOperator,
+    WorkflowsListWorkflowsOperator,
+    WorkflowsUpdateWorkflowOperator,
+)
+from airflow.providers.google.cloud.sensors.workflows import WorkflowExecutionSensor
+from airflow.utils.dates import days_ago
+
+LOCATION = os.environ.get("GCP_WORKFLOWS_LOCATION", "us-central1")
+PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id")
+
+WORKFLOW_ID = os.getenv("GCP_WORKFLOWS_WORKFLOW_ID", "airflow-test-workflow")
+
+# [START how_to_define_workflow]
+WORKFLOW_CONTENT = """
+- getCurrentTime:
+    call: http.get
+    args:
+        url: https://us-central1-workflowsample.cloudfunctions.net/datetime
+    result: currentTime
+- readWikipedia:
+    call: http.get
+    args:
+        url: https://en.wikipedia.org/w/api.php
+        query:
+            action: opensearch
+            search: ${currentTime.body.dayOfTheWeek}
+    result: wikiResult
+- returnResult:
+    return: ${wikiResult.body[1]}
+"""
+
+WORKFLOW = {
+    "description": "Test workflow",
+    "labels": {"airflow-version": "dev"},
+    "source_contents": WORKFLOW_CONTENT,
+}
+# [END how_to_define_workflow]
+
+EXECUTION = {"argument": ""}
+
+SLEEP_WORKFLOW_ID = os.getenv("GCP_WORKFLOWS_SLEEP_WORKFLOW_ID", "sleep_workflow")
+SLEEP_WORKFLOW_CONTENT = """
+- someSleep:
+    call: sys.sleep
+    args:
+        seconds: 120
+"""
+
+SLEEP_WORKFLOW = {
+    "description": "Test workflow",
+    "labels": {"airflow-version": "dev"},
+    "source_contents": SLEEP_WORKFLOW_CONTENT,
+}
+
+
+with DAG("example_cloud_workflows", start_date=days_ago(1), schedule_interval=None) as dag:
+    # [START how_to_create_workflow]
+    create_workflow = WorkflowsCreateWorkflowOperator(
+        task_id="create_workflow",
+        location=LOCATION,
+        project_id=PROJECT_ID,
+        workflow=WORKFLOW,
+        workflow_id=WORKFLOW_ID,
+    )
+    # [END how_to_create_workflow]
+
+    # [START how_to_update_workflow]
+    update_workflows = WorkflowsUpdateWorkflowOperator(
+        task_id="update_workflows",
+        location=LOCATION,
+        project_id=PROJECT_ID,
+        workflow_id=WORKFLOW_ID,
+        update_mask={"paths": ["name", "description"]},
+    )
+    # [END how_to_update_workflow]
+
+    # [START how_to_get_workflow]
+    get_workflow = WorkflowsGetWorkflowOperator(
+        task_id="get_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
+    )
+    # [END how_to_get_workflow]
+
+    # [START how_to_list_workflows]
+    list_workflows = WorkflowsListWorkflowsOperator(
+        task_id="list_workflows",
+        location=LOCATION,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_list_workflows]
+
+    # [START how_to_delete_workflow]
+    delete_workflow = WorkflowsDeleteWorkflowOperator(
+        task_id="delete_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
+    )
+    # [END how_to_delete_workflow]
+
+    # [START how_to_create_execution]
+    create_execution = WorkflowsCreateExecutionOperator(
+        task_id="create_execution",
+        location=LOCATION,
+        project_id=PROJECT_ID,
+        execution=EXECUTION,
+        workflow_id=WORKFLOW_ID,
+    )
+    # [END how_to_create_execution]
+
+    # [START how_to_wait_for_execution]
+    wait_for_execution = WorkflowExecutionSensor(
+        task_id="wait_for_execution",
+        location=LOCATION,
+        project_id=PROJECT_ID,
+        workflow_id=WORKFLOW_ID,
+        execution_id='{{ task_instance.xcom_pull("create_execution", key="execution_id") }}',
+    )
+    # [END how_to_wait_for_execution]
+
+    # [START how_to_get_execution]
+    get_execution = WorkflowsGetExecutionOperator(
+        task_id="get_execution",
+        location=LOCATION,
+        project_id=PROJECT_ID,
+        workflow_id=WORKFLOW_ID,
+        execution_id='{{ task_instance.xcom_pull("create_execution", key="execution_id") }}',
+    )
+    # [END how_to_get_execution]
+
+    # [START how_to_list_executions]
+    list_executions = WorkflowsListExecutionsOperator(
+        task_id="list_executions", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
+    )
+    # [END how_to_list_executions]
+
+    create_workflow_for_cancel = WorkflowsCreateWorkflowOperator(
+        task_id="create_workflow_for_cancel",
+        location=LOCATION,
+        project_id=PROJECT_ID,
+        workflow=SLEEP_WORKFLOW,
+        workflow_id=SLEEP_WORKFLOW_ID,
+    )
+
+    create_execution_for_cancel = WorkflowsCreateExecutionOperator(
+        task_id="create_execution_for_cancel",
+        location=LOCATION,
+        project_id=PROJECT_ID,
+        execution=EXECUTION,
+        workflow_id=SLEEP_WORKFLOW_ID,
+    )
+
+    # [START how_to_cancel_execution]
+    cancel_execution = WorkflowsCancelExecutionOperator(
+        task_id="cancel_execution",
+        location=LOCATION,
+        project_id=PROJECT_ID,
+        workflow_id=SLEEP_WORKFLOW_ID,
+        execution_id='{{ task_instance.xcom_pull("create_execution_for_cancel", key="execution_id") }}',
+    )
+    # [END how_to_cancel_execution]
+
+    create_workflow >> update_workflows >> [get_workflow, list_workflows]
+    update_workflows >> [create_execution, create_execution_for_cancel]
+
+    create_execution >> wait_for_execution >> [get_execution, list_executions]
+    create_workflow_for_cancel >> create_execution_for_cancel >> cancel_execution
+
+    [cancel_execution, list_executions] >> delete_workflow
+
+
+if __name__ == '__main__':
+    dag.clear(dag_run_state=None)
+    dag.run()
diff --git a/airflow/providers/google/cloud/hooks/workflows.py b/airflow/providers/google/cloud/hooks/workflows.py
new file mode 100644
index 0000000..6c78350
--- /dev/null
+++ b/airflow/providers/google/cloud/hooks/workflows.py
@@ -0,0 +1,401 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Dict, Optional, Sequence, Tuple, Union
+
+from google.api_core.operation import Operation
+from google.api_core.retry import Retry
+
+# pylint: disable=no-name-in-module
+from google.cloud.workflows.executions_v1beta import Execution, ExecutionsClient
+from google.cloud.workflows.executions_v1beta.services.executions.pagers import ListExecutionsPager
+from google.cloud.workflows_v1beta import Workflow, WorkflowsClient
+from google.cloud.workflows_v1beta.services.workflows.pagers import ListWorkflowsPager
+from google.protobuf.field_mask_pb2 import FieldMask
+
+from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
+
+# pylint: enable=no-name-in-module
+
+
+class WorkflowsHook(GoogleBaseHook):
+    """
+    Hook for Google GCP APIs.
+
+    All the methods in the hook where project_id is used must be called with
+    keyword arguments rather than positional.
+    """
+
+    def get_workflows_client(self) -> WorkflowsClient:
+        """Returns WorkflowsClient."""
+        return WorkflowsClient(credentials=self._get_credentials(), client_info=self.client_info)
+
+    def get_executions_client(self) -> ExecutionsClient:
+        """Returns ExecutionsClient."""
+        return ExecutionsClient(credentials=self._get_credentials(), client_info=self.client_info)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_workflow(
+        self,
+        workflow: Dict,
+        workflow_id: str,
+        location: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> Operation:
+        """
+        Creates a new workflow. If a workflow with the specified name
+        already exists in the specified project and location, the long
+        running operation will return
+        [ALREADY_EXISTS][google.rpc.Code.ALREADY_EXISTS] error.
+
+        :param workflow: Required. Workflow to be created.
+        :type workflow: Dict
+        :param workflow_id: Required. The ID of the workflow to be created.
+        :type workflow_id: str
+        :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+        :type project_id: str
+        :param location: Required. The GCP region in which to handle the request.
+        :type location: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        metadata = metadata or ()
+        client = self.get_workflows_client()
+        parent = f"projects/{project_id}/locations/{location}"
+        return client.create_workflow(
+            request={"parent": parent, "workflow": workflow, "workflow_id": workflow_id},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_workflow(
+        self,
+        workflow_id: str,
+        location: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> Workflow:
+        """
+        Gets details of a single Workflow.
+
+        :param workflow_id: Required. The ID of the workflow to be created.
+        :type workflow_id: str
+        :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+        :type project_id: str
+        :param location: Required. The GCP region in which to handle the request.
+        :type location: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        metadata = metadata or ()
+        client = self.get_workflows_client()
+        name = f"projects/{project_id}/locations/{location}/workflows/{workflow_id}"
+        return client.get_workflow(request={"name": name}, retry=retry, timeout=timeout, metadata=metadata)
+
+    def update_workflow(
+        self,
+        workflow: Union[Dict, Workflow],
+        update_mask: Optional[FieldMask] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> Operation:
+        """
+        Updates an existing workflow.
+        Running this method has no impact on already running
+        executions of the workflow. A new revision of the
+        workflow may be created as a result of a successful
+        update operation. In that case, such revision will be
+        used in new workflow executions.
+
+        :param workflow: Required. Workflow to be created.
+        :type workflow: Dict
+        :param update_mask: List of fields to be updated. If not present,
+            the entire workflow will be updated.
+        :type update_mask: FieldMask
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        metadata = metadata or ()
+        client = self.get_workflows_client()
+        return client.update_workflow(
+            request={"workflow": workflow, "update_mask": update_mask},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def delete_workflow(
+        self,
+        workflow_id: str,
+        location: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> Operation:
+        """
+        Deletes a workflow with the specified name.
+        This method also cancels and deletes all running
+        executions of the workflow.
+
+        :param workflow_id: Required. The ID of the workflow to be created.
+        :type workflow_id: str
+        :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+        :type project_id: str
+        :param location: Required. The GCP region in which to handle the request.
+        :type location: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        metadata = metadata or ()
+        client = self.get_workflows_client()
+        name = f"projects/{project_id}/locations/{location}/workflows/{workflow_id}"
+        return client.delete_workflow(request={"name": name}, retry=retry, timeout=timeout, metadata=metadata)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def list_workflows(
+        self,
+        location: str,
+        project_id: str,
+        filter_: Optional[str] = None,
+        order_by: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> ListWorkflowsPager:
+        """
+        Lists Workflows in a given project and location.
+        The default order is not specified.
+
+        :param filter_: Filter to restrict results to specific workflows.
+        :type filter_: str
+        :param order_by: Comma-separated list of fields that that
+            specify the order of the results. Default sorting order for a field is ascending.
+            To specify descending order for a field, append a "desc" suffix.
+            If not specified, the results will be returned in an unspecified order.
+        :type order_by: str
+        :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+        :type project_id: str
+        :param location: Required. The GCP region in which to handle the request.
+        :type location: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        metadata = metadata or ()
+        client = self.get_workflows_client()
+        parent = f"projects/{project_id}/locations/{location}"
+
+        return client.list_workflows(
+            request={"parent": parent, "filter": filter_, "order_by": order_by},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_execution(
+        self,
+        workflow_id: str,
+        location: str,
+        project_id: str,
+        execution: Dict,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> Execution:
+        """
+        Creates a new execution using the latest revision of
+        the given workflow.
+
+        :param execution: Required. Input parameters of the execution represented as a dictionary.
+        :type execution: Dict
+        :param workflow_id: Required. The ID of the workflow.
+        :type workflow_id: str
+        :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+        :type project_id: str
+        :param location: Required. The GCP region in which to handle the request.
+        :type location: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        metadata = metadata or ()
+        client = self.get_executions_client()
+        parent = f"projects/{project_id}/locations/{location}/workflows/{workflow_id}"
+        return client.create_execution(
+            request={"parent": parent, "execution": execution},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_execution(
+        self,
+        workflow_id: str,
+        execution_id: str,
+        location: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> Execution:
+        """
+        Returns an execution for the given ``workflow_id`` and ``execution_id``.
+
+        :param workflow_id: Required. The ID of the workflow.
+        :type workflow_id: str
+        :param execution_id: Required. The ID of the execution.
+        :type execution_id: str
+        :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+        :type project_id: str
+        :param location: Required. The GCP region in which to handle the request.
+        :type location: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        metadata = metadata or ()
+        client = self.get_executions_client()
+        name = f"projects/{project_id}/locations/{location}/workflows/{workflow_id}/executions/{execution_id}"
+        return client.get_execution(request={"name": name}, retry=retry, timeout=timeout, metadata=metadata)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def cancel_execution(
+        self,
+        workflow_id: str,
+        execution_id: str,
+        location: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> Execution:
+        """
+        Cancels an execution using the given ``workflow_id`` and ``execution_id``.
+
+        :param workflow_id: Required. The ID of the workflow.
+        :type workflow_id: str
+        :param execution_id: Required. The ID of the execution.
+        :type execution_id: str
+        :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+        :type project_id: str
+        :param location: Required. The GCP region in which to handle the request.
+        :type location: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        metadata = metadata or ()
+        client = self.get_executions_client()
+        name = f"projects/{project_id}/locations/{location}/workflows/{workflow_id}/executions/{execution_id}"
+        return client.cancel_execution(
+            request={"name": name}, retry=retry, timeout=timeout, metadata=metadata
+        )
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def list_executions(
+        self,
+        workflow_id: str,
+        location: str,
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> ListExecutionsPager:
+        """
+        Returns a list of executions which belong to the
+        workflow with the given name. The method returns
+        executions of all workflow revisions. Returned
+        executions are ordered by their start time (newest
+        first).
+
+        :param workflow_id: Required. The ID of the workflow to be created.
+        :type workflow_id: str
+        :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+        :type project_id: str
+        :param location: Required. The GCP region in which to handle the request.
+        :type location: str
+        :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        metadata = metadata or ()
+        client = self.get_executions_client()
+        parent = f"projects/{project_id}/locations/{location}/workflows/{workflow_id}"
+        return client.list_executions(
+            request={"parent": parent}, retry=retry, timeout=timeout, metadata=metadata
+        )
diff --git a/airflow/providers/google/cloud/operators/workflows.py b/airflow/providers/google/cloud/operators/workflows.py
new file mode 100644
index 0000000..c7fc96d
--- /dev/null
+++ b/airflow/providers/google/cloud/operators/workflows.py
@@ -0,0 +1,714 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import hashlib
+import json
+import re
+import uuid
+from datetime import datetime, timedelta
+from typing import Dict, Optional, Sequence, Tuple, Union
+
+import pytz
+from google.api_core.exceptions import AlreadyExists
+from google.api_core.retry import Retry
+
+# pylint: disable=no-name-in-module
+from google.cloud.workflows.executions_v1beta import Execution
+from google.cloud.workflows_v1beta import Workflow
+
+# pylint: enable=no-name-in-module
+from google.protobuf.field_mask_pb2 import FieldMask
+
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.hooks.workflows import WorkflowsHook
+
+
+class WorkflowsCreateWorkflowOperator(BaseOperator):
+    """
+    Creates a new workflow. If a workflow with the specified name
+    already exists in the specified project and location, the long
+    running operation will return
+    [ALREADY_EXISTS][google.rpc.Code.ALREADY_EXISTS] error.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:WorkflowsCreateWorkflowOperator`
+
+    :param workflow: Required. Workflow to be created.
+    :type workflow: Dict
+    :param workflow_id: Required. The ID of the workflow to be created.
+    :type workflow_id: str
+    :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+    :type project_id: str
+    :param location: Required. The GCP region in which to handle the request.
+    :type location: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    """
+
+    template_fields = ("location", "workflow", "workflow_id")
+    template_fields_renderers = {"workflow": "json"}
+
+    def __init__(
+        self,
+        *,
+        workflow: Dict,
+        workflow_id: str,
+        location: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        force_rerun: bool = False,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+
+        self.workflow = workflow
+        self.workflow_id = workflow_id
+        self.location = location
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.force_rerun = force_rerun
+
+    def _workflow_id(self, context):
+        if self.workflow_id and not self.force_rerun:
+            # If users provide workflow id then assuring the idempotency
+            # is on their side
+            return self.workflow_id
+
+        if self.force_rerun:
+            hash_base = str(uuid.uuid4())
+        else:
+            hash_base = json.dumps(self.workflow, sort_keys=True)
+
+        # We are limited by allowed length of workflow_id so
+        # we use hash of whole information
+        exec_date = context['execution_date'].isoformat()
+        base = f"airflow_{self.dag_id}_{self.task_id}_{exec_date}_{hash_base}"
+        workflow_id = hashlib.md5(base.encode()).hexdigest()
+        return re.sub(r"[:\-+.]", "_", workflow_id)
+
+    def execute(self, context):
+        hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        workflow_id = self._workflow_id(context)
+
+        self.log.info("Creating workflow")
+        try:
+            operation = hook.create_workflow(
+                workflow=self.workflow,
+                workflow_id=workflow_id,
+                location=self.location,
+                project_id=self.project_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            workflow = operation.result()
+        except AlreadyExists:
+            workflow = hook.get_workflow(
+                workflow_id=workflow_id,
+                location=self.location,
+                project_id=self.project_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+        return Workflow.to_dict(workflow)
+
+
+class WorkflowsUpdateWorkflowOperator(BaseOperator):
+    """
+    Updates an existing workflow.
+    Running this method has no impact on already running
+    executions of the workflow. A new revision of the
+    workflow may be created as a result of a successful
+    update operation. In that case, such revision will be
+    used in new workflow executions.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:WorkflowsUpdateWorkflowOperator`
+
+    :param workflow_id: Required. The ID of the workflow to be updated.
+    :type workflow_id: str
+    :param location: Required. The GCP region in which to handle the request.
+    :type location: str
+    :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+    :type project_id: str
+    :param update_mask: List of fields to be updated. If not present,
+        the entire workflow will be updated.
+    :type update_mask: FieldMask
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    """
+
+    template_fields = ("workflow_id", "update_mask")
+    template_fields_renderers = {"update_mask": "json"}
+
+    def __init__(
+        self,
+        *,
+        workflow_id: str,
+        location: str,
+        project_id: Optional[str] = None,
+        update_mask: Optional[FieldMask] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+
+        self.workflow_id = workflow_id
+        self.location = location
+        self.project_id = project_id
+        self.update_mask = update_mask
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+
+        workflow = hook.get_workflow(
+            workflow_id=self.workflow_id,
+            project_id=self.project_id,
+            location=self.location,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        self.log.info("Updating workflow")
+        operation = hook.update_workflow(
+            workflow=workflow,
+            update_mask=self.update_mask,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        workflow = operation.result()
+        return Workflow.to_dict(workflow)
+
+
+class WorkflowsDeleteWorkflowOperator(BaseOperator):
+    """
+    Deletes a workflow with the specified name.
+    This method also cancels and deletes all running
+    executions of the workflow.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:WorkflowsDeleteWorkflowOperator`
+
+    :param workflow_id: Required. The ID of the workflow to be created.
+    :type workflow_id: str
+    :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+    :type project_id: str
+    :param location: Required. The GCP region in which to handle the request.
+    :type location: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    """
+
+    template_fields = ("location", "workflow_id")
+
+    def __init__(
+        self,
+        *,
+        workflow_id: str,
+        location: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+
+        self.workflow_id = workflow_id
+        self.location = location
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Deleting workflow %s", self.workflow_id)
+        operation = hook.delete_workflow(
+            workflow_id=self.workflow_id,
+            location=self.location,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        operation.result()
+
+
+class WorkflowsListWorkflowsOperator(BaseOperator):
+    """
+    Lists Workflows in a given project and location.
+    The default order is not specified.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:WorkflowsListWorkflowsOperator`
+
+    :param filter_: Filter to restrict results to specific workflows.
+    :type filter_: str
+    :param order_by: Comma-separated list of fields that that
+        specify the order of the results. Default sorting order for a field is ascending.
+        To specify descending order for a field, append a "desc" suffix.
+        If not specified, the results will be returned in an unspecified order.
+    :type order_by: str
+    :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+    :type project_id: str
+    :param location: Required. The GCP region in which to handle the request.
+    :type location: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    """
+
+    template_fields = ("location", "order_by", "filter_")
+
+    def __init__(
+        self,
+        *,
+        location: str,
+        project_id: Optional[str] = None,
+        filter_: Optional[str] = None,
+        order_by: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+
+        self.filter_ = filter_
+        self.order_by = order_by
+        self.location = location
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Retrieving workflows")
+        workflows_iter = hook.list_workflows(
+            filter_=self.filter_,
+            order_by=self.order_by,
+            location=self.location,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return [Workflow.to_dict(w) for w in workflows_iter]
+
+
+class WorkflowsGetWorkflowOperator(BaseOperator):
+    """
+    Gets details of a single Workflow.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:WorkflowsGetWorkflowOperator`
+
+    :param workflow_id: Required. The ID of the workflow to be created.
+    :type workflow_id: str
+    :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+    :type project_id: str
+    :param location: Required. The GCP region in which to handle the request.
+    :type location: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    """
+
+    template_fields = ("location", "workflow_id")
+
+    def __init__(
+        self,
+        *,
+        workflow_id: str,
+        location: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+
+        self.workflow_id = workflow_id
+        self.location = location
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Retrieving workflow")
+        workflow = hook.get_workflow(
+            workflow_id=self.workflow_id,
+            location=self.location,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Workflow.to_dict(workflow)
+
+
+class WorkflowsCreateExecutionOperator(BaseOperator):
+    """
+    Creates a new execution using the latest revision of
+    the given workflow.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:WorkflowsCreateExecutionOperator`
+
+    :param execution: Required. Execution to be created.
+    :type execution: Dict
+    :param workflow_id: Required. The ID of the workflow.
+    :type workflow_id: str
+    :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+    :type project_id: str
+    :param location: Required. The GCP region in which to handle the request.
+    :type location: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    """
+
+    template_fields = ("location", "workflow_id", "execution")
+    template_fields_renderers = {"execution": "json"}
+
+    def __init__(
+        self,
+        *,
+        workflow_id: str,
+        execution: Dict,
+        location: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+
+        self.workflow_id = workflow_id
+        self.execution = execution
+        self.location = location
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Creating execution")
+        execution = hook.create_execution(
+            workflow_id=self.workflow_id,
+            execution=self.execution,
+            location=self.location,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        execution_id = execution.name.split("/")[-1]
+        self.xcom_push(context, key="execution_id", value=execution_id)
+        return Execution.to_dict(execution)
+
+
+class WorkflowsCancelExecutionOperator(BaseOperator):
+    """
+    Cancels an execution using the given ``workflow_id`` and ``execution_id``.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:WorkflowsCancelExecutionOperator`
+
+    :param workflow_id: Required. The ID of the workflow.
+    :type workflow_id: str
+    :param execution_id: Required. The ID of the execution.
+    :type execution_id: str
+    :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+    :type project_id: str
+    :param location: Required. The GCP region in which to handle the request.
+    :type location: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    """
+
+    template_fields = ("location", "workflow_id", "execution_id")
+
+    def __init__(
+        self,
+        *,
+        workflow_id: str,
+        execution_id: str,
+        location: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+
+        self.workflow_id = workflow_id
+        self.execution_id = execution_id
+        self.location = location
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Canceling execution %s", self.execution_id)
+        execution = hook.cancel_execution(
+            workflow_id=self.workflow_id,
+            execution_id=self.execution_id,
+            location=self.location,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Execution.to_dict(execution)
+
+
+class WorkflowsListExecutionsOperator(BaseOperator):
+    """
+    Returns a list of executions which belong to the
+    workflow with the given name. The method returns
+    executions of all workflow revisions. Returned
+    executions are ordered by their start time (newest
+    first).
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:WorkflowsListExecutionsOperator`
+
+    :param workflow_id: Required. The ID of the workflow to be created.
+    :type workflow_id: str
+    :param start_date_filter: If passed only executions older that this date will be returned.
+        By default operators return executions from last 60 minutes
+    :type start_date_filter: datetime
+    :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+    :type project_id: str
+    :param location: Required. The GCP region in which to handle the request.
+    :type location: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    """
+
+    template_fields = ("location", "workflow_id")
+
+    def __init__(
+        self,
+        *,
+        workflow_id: str,
+        location: str,
+        start_date_filter: Optional[datetime] = None,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+
+        self.workflow_id = workflow_id
+        self.location = location
+        self.start_date_filter = start_date_filter or datetime.now(tz=pytz.UTC) - timedelta(minutes=60)
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Retrieving executions for workflow %s", self.workflow_id)
+        execution_iter = hook.list_executions(
+            workflow_id=self.workflow_id,
+            location=self.location,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+
+        return [Execution.to_dict(e) for e in execution_iter if e.start_time > self.start_date_filter]
+
+
+class WorkflowsGetExecutionOperator(BaseOperator):
+    """
+    Returns an execution for the given ``workflow_id`` and ``execution_id``.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:WorkflowsGetExecutionOperator`
+
+    :param workflow_id: Required. The ID of the workflow.
+    :type workflow_id: str
+    :param execution_id: Required. The ID of the execution.
+    :type execution_id: str
+    :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+    :type project_id: str
+    :param location: Required. The GCP region in which to handle the request.
+    :type location: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    """
+
+    template_fields = ("location", "workflow_id", "execution_id")
+
+    def __init__(
+        self,
+        *,
+        workflow_id: str,
+        execution_id: str,
+        location: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+
+        self.workflow_id = workflow_id
+        self.execution_id = execution_id
+        self.location = location
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Retrieving execution %s for workflow %s", self.execution_id, self.workflow_id)
+        execution = hook.get_execution(
+            workflow_id=self.workflow_id,
+            execution_id=self.execution_id,
+            location=self.location,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Execution.to_dict(execution)
diff --git a/airflow/providers/google/cloud/sensors/workflows.py b/airflow/providers/google/cloud/sensors/workflows.py
new file mode 100644
index 0000000..5950458
--- /dev/null
+++ b/airflow/providers/google/cloud/sensors/workflows.py
@@ -0,0 +1,123 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Optional, Sequence, Set, Tuple, Union
+
+from google.api_core.retry import Retry
+from google.cloud.workflows.executions_v1beta import Execution
+
+from airflow.exceptions import AirflowException
+from airflow.providers.google.cloud.hooks.workflows import WorkflowsHook
+from airflow.sensors.base import BaseSensorOperator
+
+
+class WorkflowExecutionSensor(BaseSensorOperator):
+    """
+    Checks state of an execution for the given ``workflow_id`` and ``execution_id``.
+
+    :param workflow_id: Required. The ID of the workflow.
+    :type workflow_id: str
+    :param execution_id: Required. The ID of the execution.
+    :type execution_id: str
+    :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
+    :type project_id: str
+    :param location: Required. The Cloud Dataproc region in which to handle the request.
+    :type location: str
+    :param success_states: Execution states to be considered as successful, by default
+        it's only ``SUCCEEDED`` state
+    :type success_states: List[Execution.State]
+    :param failure_states: Execution states to be considered as failures, by default
+        they are ``FAILED`` and ``CANCELLED`` states.
+    :type failure_states: List[Execution.State]
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param request_timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type request_timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    """
+
+    template_fields = ("location", "workflow_id", "execution_id")
+
+    def __init__(
+        self,
+        *,
+        workflow_id: str,
+        execution_id: str,
+        location: str,
+        project_id: str,
+        success_states: Optional[Set[Execution.State]] = None,
+        failure_states: Optional[Set[Execution.State]] = None,
+        retry: Optional[Retry] = None,
+        request_timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+
+        self.success_states = success_states or {Execution.State.SUCCEEDED}
+        self.failure_states = failure_states or {Execution.State.FAILED, Execution.State.CANCELLED}
+        self.workflow_id = workflow_id
+        self.execution_id = execution_id
+        self.location = location
+        self.project_id = project_id
+        self.retry = retry
+        self.request_timeout = request_timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def poke(self, context):
+        hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
+        self.log.info("Checking state of execution %s for workflow %s", self.execution_id, self.workflow_id)
+        execution: Execution = hook.get_execution(
+            workflow_id=self.workflow_id,
+            execution_id=self.execution_id,
+            location=self.location,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.request_timeout,
+            metadata=self.metadata,
+        )
+
+        state = execution.state
+        if state in self.failure_states:
+            raise AirflowException(
+                f"Execution {self.execution_id} for workflow {self.execution_id} "
+                f"failed and is in `{state}` state",
+            )
+
+        if state in self.success_states:
+            self.log.info(
+                "Execution %s for workflow %s completed with state: %s",
+                self.execution_id,
+                self.workflow_id,
+                state,
+            )
+            return True
+
+        self.log.info(
+            "Execution %s for workflow %s does not completed yet, current state: %s",
+            self.execution_id,
+            self.workflow_id,
+            state,
+        )
+        return False
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index 67b7af4..39ba434 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -301,6 +301,11 @@ integrations:
       - /docs/apache-airflow-providers-google/operators/cloud/natural_language.rst
     logo: /integration-logos/gcp/Cloud-NLP.png
     tags: [gcp]
+  - integration-name: Google Cloud Workflows
+    external-doc-url: https://cloud.google.com/workflows/
+    how-to-guide:
+      - /docs/apache-airflow-providers-google/operators/cloud/workflows.rst
+    tags: [gcp]
 
 operators:
   - integration-name: Google Ads
@@ -401,6 +406,9 @@ operators:
   - integration-name: Google Cloud Vision
     python-modules:
       - airflow.providers.google.cloud.operators.vision
+  - integration-name: Google Cloud Workflows
+    python-modules:
+      - airflow.providers.google.cloud.operators.workflows
   - integration-name: Google Cloud Firestore
     python-modules:
       - airflow.providers.google.firebase.operators.firestore
@@ -445,6 +453,9 @@ sensors:
   - integration-name: Google Cloud Pub/Sub
     python-modules:
       - airflow.providers.google.cloud.sensors.pubsub
+  - integration-name: Google Cloud Workflows
+    python-modules:
+      - airflow.providers.google.cloud.sensors.workflows
   - integration-name: Google Campaign Manager
     python-modules:
       - airflow.providers.google.marketing_platform.sensors.campaign_manager
@@ -565,6 +576,9 @@ hooks:
   - integration-name: Google Cloud Vision
     python-modules:
       - airflow.providers.google.cloud.hooks.vision
+  - integration-name: Google Cloud Workflows
+    python-modules:
+      - airflow.providers.google.cloud.hooks.workflows
   - integration-name: Google
     python-modules:
       - airflow.providers.google.common.hooks.base_google
diff --git a/docs/apache-airflow-providers-google/operators/cloud/workflows.rst b/docs/apache-airflow-providers-google/operators/cloud/workflows.rst
new file mode 100644
index 0000000..551a7ca
--- /dev/null
+++ b/docs/apache-airflow-providers-google/operators/cloud/workflows.rst
@@ -0,0 +1,185 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Google Cloud Workflows Operators
+================================
+
+You can use Workflows to create serverless workflows that link series of serverless tasks together
+in an order you define. Combine the power of Google Cloud's APIs, serverless products like Cloud
+Functions and Cloud Run, and calls to external APIs to create flexible serverless applications.
+
+For more information about the service visit
+`Workflows production documentation <Product documentation <https://cloud.google.com/workflows/docs/overview>`__.
+
+.. contents::
+  :depth: 1
+  :local:
+
+Prerequisite Tasks
+------------------
+
+.. include::/operators/_partials/prerequisite_tasks.rst
+
+
+.. _howto/operator:WorkflowsCreateWorkflowOperator:
+
+Create workflow
+===============
+
+To create a workflow use
+:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsCreateWorkflowOperator`.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+      :language: python
+      :dedent: 4
+      :start-after: [START how_to_create_workflow]
+      :end-before: [END how_to_create_workflow]
+
+The workflow should be define in similar why to this example:
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+      :language: python
+      :dedent: 0
+      :start-after: [START how_to_define_workflow]
+      :end-before: [END how_to_define_workflow]
+
+For more information about authoring workflows check official
+production documentation `<Product documentation <https://cloud.google.com/workflows/docs/overview>`__.
+
+
+.. _howto/operator:WorkflowsUpdateWorkflowOperator:
+
+Update workflow
+===============
+
+To update a workflow use
+:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsUpdateWorkflowOperator`.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+      :language: python
+      :dedent: 4
+      :start-after: [START how_to_update_workflow]
+      :end-before: [END how_to_update_workflow]
+
+.. _howto/operator:WorkflowsGetWorkflowOperator:
+
+Get workflow
+============
+
+To get a workflow use
+:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsGetWorkflowOperator`.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+      :language: python
+      :dedent: 4
+      :start-after: [START how_to_get_workflow]
+      :end-before: [END how_to_get_workflow]
+
+.. _howto/operator:WorkflowsListWorkflowsOperator:
+
+List workflows
+==============
+
+To list workflows use
+:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsListWorkflowsOperator`.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+      :language: python
+      :dedent: 4
+      :start-after: [START how_to_list_workflows]
+      :end-before: [END how_to_list_workflows]
+
+.. _howto/operator:WorkflowsDeleteWorkflowOperator:
+
+Delete workflow
+===============
+
+To delete a workflow use
+:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsDeleteWorkflowOperator`.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+      :language: python
+      :dedent: 4
+      :start-after: [START how_to_delete_workflow]
+      :end-before: [END how_to_delete_workflow]
+
+.. _howto/operator:WorkflowsCreateExecutionOperator:
+
+Create execution
+================
+
+To create an execution use
+:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsCreateExecutionOperator`.
+This operator is not idempotent due to API limitation.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+      :language: python
+      :dedent: 4
+      :start-after: [START how_to_create_execution]
+      :end-before: [END how_to_create_execution]
+
+The create operator does not wait for execution to complete. To wait for execution result use
+:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowExecutionSensor`.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+      :language: python
+      :dedent: 4
+      :start-after: [START how_to_wait_for_execution]
+      :end-before: [END how_to_wait_for_execution]
+
+.. _howto/operator:WorkflowsGetExecutionOperator:
+
+Get execution
+================
+
+To get an execution use
+:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsGetExecutionOperator`.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+      :language: python
+      :dedent: 4
+      :start-after: [START how_to_get_execution]
+      :end-before: [END how_to_get_execution]
+
+.. _howto/operator:WorkflowsListExecutionsOperator:
+
+List executions
+===============
+
+To list executions use
+:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsListExecutionsOperator`.
+By default this operator will return only executions for last 60 minutes.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+      :language: python
+      :dedent: 4
+      :start-after: [START how_to_list_executions]
+      :end-before: [END how_to_list_executions]
+
+.. _howto/operator:WorkflowsCancelExecutionOperator:
+
+Cancel execution
+================
+
+To cancel an execution use
+:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsCancelExecutionOperator`.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+      :language: python
+      :dedent: 4
+      :start-after: [START how_to_cancel_execution]
+      :end-before: [END how_to_cancel_execution]
diff --git a/setup.py b/setup.py
index 6e2a2f7..67e6a85 100644
--- a/setup.py
+++ b/setup.py
@@ -278,6 +278,7 @@ flask_oauth = [
 google = [
     'PyOpenSSL',
     'google-ads>=4.0.0,<8.0.0',
+    'google-api-core>=1.25.1,<2.0.0',
     'google-api-python-client>=1.6.0,<2.0.0',
     'google-auth>=1.0.0,<2.0.0',
     'google-auth-httplib2>=0.0.1',
@@ -305,6 +306,7 @@ google = [
     'google-cloud-translate>=1.5.0,<2.0.0',
     'google-cloud-videointelligence>=1.7.0,<2.0.0',
     'google-cloud-vision>=0.35.2,<2.0.0',
+    'google-cloud-workflows>=0.1.0,<2.0.0',
     'grpcio-gcp>=0.2.2',
     'json-merge-patch~=0.2',
     'pandas-gbq',
diff --git a/tests/providers/google/cloud/hooks/test_workflows.py b/tests/providers/google/cloud/hooks/test_workflows.py
new file mode 100644
index 0000000..4f3d4d0
--- /dev/null
+++ b/tests/providers/google/cloud/hooks/test_workflows.py
@@ -0,0 +1,256 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import mock
+
+from airflow.providers.google.cloud.hooks.workflows import WorkflowsHook
+
+BASE_PATH = "airflow.providers.google.cloud.hooks.workflows.{}"
+LOCATION = "europe-west1"
+WORKFLOW_ID = "workflow_id"
+EXECUTION_ID = "execution_id"
+WORKFLOW = {"aa": "bb"}
+EXECUTION = {"ccc": "ddd"}
+PROJECT_ID = "airflow-testing"
+METADATA = ()
+TIMEOUT = None
+RETRY = None
+FILTER_ = "aaaa"
+ORDER_BY = "bbb"
+UPDATE_MASK = "aaa,bbb"
+
+WORKFLOW_PARENT = f"projects/{PROJECT_ID}/locations/{LOCATION}"
+WORKFLOW_NAME = f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{WORKFLOW_ID}"
+EXECUTION_PARENT = f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{WORKFLOW_ID}"
+EXECUTION_NAME = (
+    f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{WORKFLOW_ID}/executions/{EXECUTION_ID}"
+)
+
+
+def mock_init(*args, **kwargs):
+    pass
+
+
+class TestWorkflowsHook:
+    def setup_method(self, _):
+        with mock.patch(BASE_PATH.format("GoogleBaseHook.__init__"), new=mock_init):
+            self.hook = WorkflowsHook(gcp_conn_id="test")  # pylint: disable=attribute-defined-outside-init
+
+    @mock.patch(BASE_PATH.format("WorkflowsHook._get_credentials"))
+    @mock.patch(BASE_PATH.format("WorkflowsHook.client_info"), new_callable=mock.PropertyMock)
+    @mock.patch(BASE_PATH.format("WorkflowsClient"))
+    def test_get_workflows_client(self, mock_client, mock_client_info, mock_get_credentials):
+        self.hook.get_workflows_client()
+        mock_client.assert_called_once_with(
+            credentials=mock_get_credentials.return_value,
+            client_info=mock_client_info.return_value,
+        )
+
+    @mock.patch(BASE_PATH.format("WorkflowsHook._get_credentials"))
+    @mock.patch(BASE_PATH.format("WorkflowsHook.client_info"), new_callable=mock.PropertyMock)
+    @mock.patch(BASE_PATH.format("ExecutionsClient"))
+    def test_get_executions_client(self, mock_client, mock_client_info, mock_get_credentials):
+        self.hook.get_executions_client()
+        mock_client.assert_called_once_with(
+            credentials=mock_get_credentials.return_value,
+            client_info=mock_client_info.return_value,
+        )
+
+    @mock.patch(BASE_PATH.format("WorkflowsHook.get_workflows_client"))
+    def test_create_workflow(self, mock_client):
+        result = self.hook.create_workflow(
+            workflow=WORKFLOW,
+            workflow_id=WORKFLOW_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+        assert mock_client.return_value.create_workflow.return_value == result
+        mock_client.return_value.create_workflow.assert_called_once_with(
+            request=dict(workflow=WORKFLOW, workflow_id=WORKFLOW_ID, parent=WORKFLOW_PARENT),
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+    @mock.patch(BASE_PATH.format("WorkflowsHook.get_workflows_client"))
+    def test_get_workflow(self, mock_client):
+        result = self.hook.get_workflow(
+            workflow_id=WORKFLOW_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+        assert mock_client.return_value.get_workflow.return_value == result
+        mock_client.return_value.get_workflow.assert_called_once_with(
+            request=dict(name=WORKFLOW_NAME),
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+    @mock.patch(BASE_PATH.format("WorkflowsHook.get_workflows_client"))
+    def test_update_workflow(self, mock_client):
+        result = self.hook.update_workflow(
+            workflow=WORKFLOW,
+            update_mask=UPDATE_MASK,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+        assert mock_client.return_value.update_workflow.return_value == result
+        mock_client.return_value.update_workflow.assert_called_once_with(
+            request=dict(
+                workflow=WORKFLOW,
+                update_mask=UPDATE_MASK,
+            ),
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+    @mock.patch(BASE_PATH.format("WorkflowsHook.get_workflows_client"))
+    def test_delete_workflow(self, mock_client):
+        result = self.hook.delete_workflow(
+            workflow_id=WORKFLOW_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+        assert mock_client.return_value.delete_workflow.return_value == result
+        mock_client.return_value.delete_workflow.assert_called_once_with(
+            request=dict(name=WORKFLOW_NAME),
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+    @mock.patch(BASE_PATH.format("WorkflowsHook.get_workflows_client"))
+    def test_list_workflows(self, mock_client):
+        result = self.hook.list_workflows(
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            filter_=FILTER_,
+            order_by=ORDER_BY,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+        assert mock_client.return_value.list_workflows.return_value == result
+        mock_client.return_value.list_workflows.assert_called_once_with(
+            request=dict(
+                parent=WORKFLOW_PARENT,
+                filter=FILTER_,
+                order_by=ORDER_BY,
+            ),
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+    @mock.patch(BASE_PATH.format("WorkflowsHook.get_executions_client"))
+    def test_create_execution(self, mock_client):
+        result = self.hook.create_execution(
+            workflow_id=WORKFLOW_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            execution=EXECUTION,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+        assert mock_client.return_value.create_execution.return_value == result
+        mock_client.return_value.create_execution.assert_called_once_with(
+            request=dict(
+                parent=EXECUTION_PARENT,
+                execution=EXECUTION,
+            ),
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+    @mock.patch(BASE_PATH.format("WorkflowsHook.get_executions_client"))
+    def test_get_execution(self, mock_client):
+        result = self.hook.get_execution(
+            workflow_id=WORKFLOW_ID,
+            execution_id=EXECUTION_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+        assert mock_client.return_value.get_execution.return_value == result
+        mock_client.return_value.get_execution.assert_called_once_with(
+            request=dict(name=EXECUTION_NAME),
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+    @mock.patch(BASE_PATH.format("WorkflowsHook.get_executions_client"))
+    def test_cancel_execution(self, mock_client):
+        result = self.hook.cancel_execution(
+            workflow_id=WORKFLOW_ID,
+            execution_id=EXECUTION_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+        assert mock_client.return_value.cancel_execution.return_value == result
+        mock_client.return_value.cancel_execution.assert_called_once_with(
+            request=dict(name=EXECUTION_NAME),
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+    @mock.patch(BASE_PATH.format("WorkflowsHook.get_executions_client"))
+    def test_list_execution(self, mock_client):
+        result = self.hook.list_executions(
+            workflow_id=WORKFLOW_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+        assert mock_client.return_value.list_executions.return_value == result
+        mock_client.return_value.list_executions.assert_called_once_with(
+            request=dict(parent=EXECUTION_PARENT),
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
diff --git a/tests/providers/google/cloud/operators/test_workflows.py b/tests/providers/google/cloud/operators/test_workflows.py
new file mode 100644
index 0000000..5578548
--- /dev/null
+++ b/tests/providers/google/cloud/operators/test_workflows.py
@@ -0,0 +1,383 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import datetime
+from unittest import mock
+
+import pytz
+
+from airflow.providers.google.cloud.operators.workflows import (
+    WorkflowsCancelExecutionOperator,
+    WorkflowsCreateExecutionOperator,
+    WorkflowsCreateWorkflowOperator,
+    WorkflowsDeleteWorkflowOperator,
+    WorkflowsGetExecutionOperator,
+    WorkflowsGetWorkflowOperator,
+    WorkflowsListExecutionsOperator,
+    WorkflowsListWorkflowsOperator,
+    WorkflowsUpdateWorkflowOperator,
+)
+
+BASE_PATH = "airflow.providers.google.cloud.operators.workflows.{}"
+LOCATION = "europe-west1"
+WORKFLOW_ID = "workflow_id"
+EXECUTION_ID = "execution_id"
+WORKFLOW = {"aa": "bb"}
+EXECUTION = {"ccc": "ddd"}
+PROJECT_ID = "airflow-testing"
+METADATA = None
+TIMEOUT = None
+RETRY = None
+FILTER_ = "aaaa"
+ORDER_BY = "bbb"
+UPDATE_MASK = "aaa,bbb"
+GCP_CONN_ID = "test-conn"
+IMPERSONATION_CHAIN = None
+
+
+class TestWorkflowsCreateWorkflowOperator:
+    @mock.patch(BASE_PATH.format("Workflow"))
+    @mock.patch(BASE_PATH.format("WorkflowsHook"))
+    def test_execute(self, mock_hook, mock_object):
+        op = WorkflowsCreateWorkflowOperator(
+            task_id="test_task",
+            workflow=WORKFLOW,
+            workflow_id=WORKFLOW_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        result = op.execute({})
+
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+
+        mock_hook.return_value.create_workflow.assert_called_once_with(
+            workflow=WORKFLOW,
+            workflow_id=WORKFLOW_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+        assert result == mock_object.to_dict.return_value
+
+
+class TestWorkflowsUpdateWorkflowOperator:
+    @mock.patch(BASE_PATH.format("Workflow"))
+    @mock.patch(BASE_PATH.format("WorkflowsHook"))
+    def test_execute(self, mock_hook, mock_object):
+        op = WorkflowsUpdateWorkflowOperator(
+            task_id="test_task",
+            workflow_id=WORKFLOW_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            update_mask=UPDATE_MASK,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        result = op.execute({})
+
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+
+        mock_hook.return_value.get_workflow.assert_called_once_with(
+            workflow_id=WORKFLOW_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+        mock_hook.return_value.update_workflow.assert_called_once_with(
+            workflow=mock_hook.return_value.get_workflow.return_value,
+            update_mask=UPDATE_MASK,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+        assert result == mock_object.to_dict.return_value
+
+
+class TestWorkflowsDeleteWorkflowOperator:
+    @mock.patch(BASE_PATH.format("WorkflowsHook"))
+    def test_execute(
+        self,
+        mock_hook,
+    ):
+        op = WorkflowsDeleteWorkflowOperator(
+            task_id="test_task",
+            workflow_id=WORKFLOW_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        op.execute({})
+
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+
+        mock_hook.return_value.delete_workflow.assert_called_once_with(
+            workflow_id=WORKFLOW_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+
+class TestWorkflowsListWorkflowsOperator:
+    @mock.patch(BASE_PATH.format("Workflow"))
+    @mock.patch(BASE_PATH.format("WorkflowsHook"))
+    def test_execute(self, mock_hook, mock_object):
+        workflow_mock = mock.MagicMock()
+        workflow_mock.start_time = datetime.datetime.now(tz=pytz.UTC) + datetime.timedelta(minutes=5)
+        mock_hook.return_value.list_workflows.return_value = [workflow_mock]
+
+        op = WorkflowsListWorkflowsOperator(
+            task_id="test_task",
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            filter_=FILTER_,
+            order_by=ORDER_BY,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        result = op.execute({})
+
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+
+        mock_hook.return_value.list_workflows.assert_called_once_with(
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            filter_=FILTER_,
+            order_by=ORDER_BY,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+        assert result == [mock_object.to_dict.return_value]
+
+
+class TestWorkflowsGetWorkflowOperator:
+    @mock.patch(BASE_PATH.format("Workflow"))
+    @mock.patch(BASE_PATH.format("WorkflowsHook"))
+    def test_execute(self, mock_hook, mock_object):
+        op = WorkflowsGetWorkflowOperator(
+            task_id="test_task",
+            workflow_id=WORKFLOW_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        result = op.execute({})
+
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+
+        mock_hook.return_value.get_workflow.assert_called_once_with(
+            workflow_id=WORKFLOW_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+        assert result == mock_object.to_dict.return_value
+
+
+class TestWorkflowExecutionsCreateExecutionOperator:
+    @mock.patch(BASE_PATH.format("Execution"))
+    @mock.patch(BASE_PATH.format("WorkflowsHook"))
+    @mock.patch(BASE_PATH.format("WorkflowsCreateExecutionOperator.xcom_push"))
+    def test_execute(self, mock_xcom, mock_hook, mock_object):
+        mock_hook.return_value.create_execution.return_value.name = "name/execution_id"
+        op = WorkflowsCreateExecutionOperator(
+            task_id="test_task",
+            workflow_id=WORKFLOW_ID,
+            execution=EXECUTION,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        result = op.execute({})
+
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+
+        mock_hook.return_value.create_execution.assert_called_once_with(
+            workflow_id=WORKFLOW_ID,
+            execution=EXECUTION,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+        mock_xcom.assert_called_once_with({}, key="execution_id", value="execution_id")
+        assert result == mock_object.to_dict.return_value
+
+
+class TestWorkflowExecutionsCancelExecutionOperator:
+    @mock.patch(BASE_PATH.format("Execution"))
+    @mock.patch(BASE_PATH.format("WorkflowsHook"))
+    def test_execute(self, mock_hook, mock_object):
+        op = WorkflowsCancelExecutionOperator(
+            task_id="test_task",
+            workflow_id=WORKFLOW_ID,
+            execution_id=EXECUTION_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        result = op.execute({})
+
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+
+        mock_hook.return_value.cancel_execution.assert_called_once_with(
+            workflow_id=WORKFLOW_ID,
+            execution_id=EXECUTION_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+        assert result == mock_object.to_dict.return_value
+
+
+class TestWorkflowExecutionsListExecutionsOperator:
+    @mock.patch(BASE_PATH.format("Execution"))
+    @mock.patch(BASE_PATH.format("WorkflowsHook"))
+    def test_execute(self, mock_hook, mock_object):
+        execution_mock = mock.MagicMock()
+        execution_mock.start_time = datetime.datetime.now(tz=pytz.UTC) + datetime.timedelta(minutes=5)
+        mock_hook.return_value.list_executions.return_value = [execution_mock]
+
+        op = WorkflowsListExecutionsOperator(
+            task_id="test_task",
+            workflow_id=WORKFLOW_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        result = op.execute({})
+
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+
+        mock_hook.return_value.list_executions.assert_called_once_with(
+            workflow_id=WORKFLOW_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+        assert result == [mock_object.to_dict.return_value]
+
+
+class TestWorkflowExecutionsGetExecutionOperator:
+    @mock.patch(BASE_PATH.format("Execution"))
+    @mock.patch(BASE_PATH.format("WorkflowsHook"))
+    def test_execute(self, mock_hook, mock_object):
+        op = WorkflowsGetExecutionOperator(
+            task_id="test_task",
+            workflow_id=WORKFLOW_ID,
+            execution_id=EXECUTION_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        result = op.execute({})
+
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+
+        mock_hook.return_value.get_execution.assert_called_once_with(
+            workflow_id=WORKFLOW_ID,
+            execution_id=EXECUTION_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+        assert result == mock_object.to_dict.return_value
diff --git a/tests/providers/google/cloud/operators/test_workflows_system.py b/tests/providers/google/cloud/operators/test_workflows_system.py
new file mode 100644
index 0000000..0a768ed
--- /dev/null
+++ b/tests/providers/google/cloud/operators/test_workflows_system.py
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest
+
+from tests.providers.google.cloud.utils.gcp_authenticator import GCP_WORKFLOWS_KEY
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
+
+
+@pytest.mark.system("google.cloud")
+@pytest.mark.credential_file(GCP_WORKFLOWS_KEY)
+class CloudVisionExampleDagsSystemTest(GoogleSystemTest):
+    @provide_gcp_context(GCP_WORKFLOWS_KEY)
+    def test_run_example_workflow_dag(self):
+        self.run_dag('example_cloud_workflows', CLOUD_DAG_FOLDER)
diff --git a/tests/providers/google/cloud/sensors/test_workflows.py b/tests/providers/google/cloud/sensors/test_workflows.py
new file mode 100644
index 0000000..56ad958
--- /dev/null
+++ b/tests/providers/google/cloud/sensors/test_workflows.py
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import mock
+
+import pytest
+from google.cloud.workflows.executions_v1beta import Execution
+
+from airflow.exceptions import AirflowException
+from airflow.providers.google.cloud.sensors.workflows import WorkflowExecutionSensor
+
+BASE_PATH = "airflow.providers.google.cloud.sensors.workflows.{}"
+LOCATION = "europe-west1"
+WORKFLOW_ID = "workflow_id"
+EXECUTION_ID = "execution_id"
+PROJECT_ID = "airflow-testing"
+METADATA = None
+TIMEOUT = None
+RETRY = None
+GCP_CONN_ID = "test-conn"
+IMPERSONATION_CHAIN = None
+
+
+class TestWorkflowExecutionSensor:
+    @mock.patch(BASE_PATH.format("WorkflowsHook"))
+    def test_poke_success(self, mock_hook):
+        mock_hook.return_value.get_execution.return_value = mock.MagicMock(state=Execution.State.SUCCEEDED)
+        op = WorkflowExecutionSensor(
+            task_id="test_task",
+            workflow_id=WORKFLOW_ID,
+            execution_id=EXECUTION_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            request_timeout=TIMEOUT,
+            metadata=METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        result = op.poke({})
+
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+
+        mock_hook.return_value.get_execution.assert_called_once_with(
+            workflow_id=WORKFLOW_ID,
+            execution_id=EXECUTION_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+        )
+
+        assert result is True
+
+    @mock.patch(BASE_PATH.format("WorkflowsHook"))
+    def test_poke_wait(self, mock_hook):
+        mock_hook.return_value.get_execution.return_value = mock.MagicMock(state=Execution.State.ACTIVE)
+        op = WorkflowExecutionSensor(
+            task_id="test_task",
+            workflow_id=WORKFLOW_ID,
+            execution_id=EXECUTION_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            request_timeout=TIMEOUT,
+            metadata=METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        result = op.poke({})
+
+        assert result is False
+
+    @mock.patch(BASE_PATH.format("WorkflowsHook"))
+    def test_poke_failure(self, mock_hook):
+        mock_hook.return_value.get_execution.return_value = mock.MagicMock(state=Execution.State.FAILED)
+        op = WorkflowExecutionSensor(
+            task_id="test_task",
+            workflow_id=WORKFLOW_ID,
+            execution_id=EXECUTION_ID,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            retry=RETRY,
+            request_timeout=TIMEOUT,
+            metadata=METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        with pytest.raises(AirflowException):
+            op.poke({})
diff --git a/tests/providers/google/cloud/utils/gcp_authenticator.py b/tests/providers/google/cloud/utils/gcp_authenticator.py
index 8aa55b8..8cf222e 100644
--- a/tests/providers/google/cloud/utils/gcp_authenticator.py
+++ b/tests/providers/google/cloud/utils/gcp_authenticator.py
@@ -54,6 +54,7 @@ GCP_SECRET_MANAGER_KEY = 'gcp_secret_manager.json'
 GCP_SPANNER_KEY = 'gcp_spanner.json'
 GCP_STACKDRIVER = 'gcp_stackdriver.json'
 GCP_TASKS_KEY = 'gcp_tasks.json'
+GCP_WORKFLOWS_KEY = "gcp_workflows.json"
 GMP_KEY = 'gmp.json'
 G_FIREBASE_KEY = 'g_firebase.json'
 GCP_AWS_KEY = 'gcp_aws.json'