You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/06/19 17:31:44 UTC

[airflow] branch v1-10-test updated: Add 'main' param to template_fields in DataprocSubmitPySparkJobOperator (#9154)

This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new 4950aaa  Add 'main' param to template_fields in DataprocSubmitPySparkJobOperator (#9154)
4950aaa is described below

commit 4950aaa8e9e16843c012a66de418717da4a2c351
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Jun 5 15:39:42 2020 +0100

    Add 'main' param to template_fields in DataprocSubmitPySparkJobOperator (#9154)
    
    (cherry picked from commit 9bcdadaf7e6e73d3d2246fbbd32a9f30a1b43ca9)
---
 .../providers/google/cloud/operators/dataproc.py   | 1708 ++++++++++++++++++++
 1 file changed, 1708 insertions(+)

diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py
new file mode 100644
index 0000000..8c4a002
--- /dev/null
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -0,0 +1,1708 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""
+This module contains Google Dataproc operators.
+"""
+# pylint: disable=C0302
+
+import inspect
+import ntpath
+import os
+import re
+import time
+import uuid
+import warnings
+from datetime import datetime, timedelta
+from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
+
+from google.api_core.exceptions import AlreadyExists
+from google.api_core.retry import Retry
+from google.cloud.dataproc_v1beta2.types import (  # pylint: disable=no-name-in-module
+    Cluster, Duration, FieldMask,
+)
+from google.protobuf.json_format import MessageToDict
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.hooks.dataproc import DataprocHook, DataProcJobBuilder
+from airflow.providers.google.cloud.hooks.gcs import GCSHook
+from airflow.utils import timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.version import version as airflow_version
+
+
+# pylint: disable=too-many-instance-attributes
+class ClusterGenerator:
+    """
+    Create a new Dataproc Cluster.
+
+    :param cluster_name: The name of the DataProc cluster to create. (templated)
+    :type cluster_name: str
+    :param project_id: The ID of the google cloud project in which
+        to create the cluster. (templated)
+    :type project_id: str
+    :param num_workers: The # of workers to spin up. If set to zero will
+        spin up cluster in a single node mode
+    :type num_workers: int
+    :param storage_bucket: The storage bucket to use, setting to None lets dataproc
+        generate a custom one for you
+    :type storage_bucket: str
+    :param init_actions_uris: List of GCS uri's containing
+        dataproc initialization scripts
+    :type init_actions_uris: list[str]
+    :param init_action_timeout: Amount of time executable scripts in
+        init_actions_uris has to complete
+    :type init_action_timeout: str
+    :param metadata: dict of key-value google compute engine metadata entries
+        to add to all instances
+    :type metadata: dict
+    :param image_version: the version of software inside the Dataproc cluster
+    :type image_version: str
+    :param custom_image: custom Dataproc image for more info see
+        https://cloud.google.com/dataproc/docs/guides/dataproc-images
+    :type custom_image: str
+    :param custom_image_project_id: project id for the custom Dataproc image, for more info see
+        https://cloud.google.com/dataproc/docs/guides/dataproc-images
+    :type custom_image_project_id: str
+    :param autoscaling_policy: The autoscaling policy used by the cluster. Only resource names
+        including projectid and location (region) are valid. Example:
+        ``projects/[projectId]/locations/[dataproc_region]/autoscalingPolicies/[policy_id]``
+    :type autoscaling_policy: str
+    :param properties: dict of properties to set on
+        config files (e.g. spark-defaults.conf), see
+        https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#SoftwareConfig
+    :type properties: dict
+    :param optional_components: List of optional cluster components, for more info see
+        https://cloud.google.com/dataproc/docs/reference/rest/v1/ClusterConfig#Component
+    :type optional_components: list[str]
+    :param num_masters: The # of master nodes to spin up
+    :type num_masters: int
+    :param master_machine_type: Compute engine machine type to use for the master node
+    :type master_machine_type: str
+    :param master_disk_type: Type of the boot disk for the master node
+        (default is ``pd-standard``).
+        Valid values: ``pd-ssd`` (Persistent Disk Solid State Drive) or
+        ``pd-standard`` (Persistent Disk Hard Disk Drive).
+    :type master_disk_type: str
+    :param master_disk_size: Disk size for the master node
+    :type master_disk_size: int
+    :param worker_machine_type: Compute engine machine type to use for the worker nodes
+    :type worker_machine_type: str
+    :param worker_disk_type: Type of the boot disk for the worker node
+        (default is ``pd-standard``).
+        Valid values: ``pd-ssd`` (Persistent Disk Solid State Drive) or
+        ``pd-standard`` (Persistent Disk Hard Disk Drive).
+    :type worker_disk_type: str
+    :param worker_disk_size: Disk size for the worker nodes
+    :type worker_disk_size: int
+    :param num_preemptible_workers: The # of preemptible worker nodes to spin up
+    :type num_preemptible_workers: int
+    :param labels: dict of labels to add to the cluster
+    :type labels: dict
+    :param zone: The zone where the cluster will be located. Set to None to auto-zone. (templated)
+    :type zone: str
+    :param network_uri: The network uri to be used for machine communication, cannot be
+        specified with subnetwork_uri
+    :type network_uri: str
+    :param subnetwork_uri: The subnetwork uri to be used for machine communication,
+        cannot be specified with network_uri
+    :type subnetwork_uri: str
+    :param internal_ip_only: If true, all instances in the cluster will only
+        have internal IP addresses. This can only be enabled for subnetwork
+        enabled networks
+    :type internal_ip_only: bool
+    :param tags: The GCE tags to add to all instances
+    :type tags: list[str]
+    :param region: leave as 'global', might become relevant in the future. (templated)
+    :type region: str
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param service_account: The service account of the dataproc instances.
+    :type service_account: str
+    :param service_account_scopes: The URIs of service account scopes to be included.
+    :type service_account_scopes: list[str]
+    :param idle_delete_ttl: The longest duration that cluster would keep alive while
+        staying idle. Passing this threshold will cause cluster to be auto-deleted.
+        A duration in seconds.
+    :type idle_delete_ttl: int
+    :param auto_delete_time:  The time when cluster will be auto-deleted.
+    :type auto_delete_time: datetime.datetime
+    :param auto_delete_ttl: The life duration of cluster, the cluster will be
+        auto-deleted at the end of this duration.
+        A duration in seconds. (If auto_delete_time is set this parameter will be ignored)
+    :type auto_delete_ttl: int
+    :param customer_managed_key: The customer-managed key used for disk encryption
+        ``projects/[PROJECT_STORING_KEYS]/locations/[LOCATION]/keyRings/[KEY_RING_NAME]/cryptoKeys/[KEY_NAME]`` # noqa # pylint: disable=line-too-long
+    :type customer_managed_key: str
+    """
+    # pylint: disable=too-many-arguments,too-many-locals
+    def __init__(self,
+                 project_id: Optional[str] = None,
+                 cluster_name: Optional[str] = None,
+                 num_workers: Optional[int] = None,
+                 zone: Optional[str] = None,
+                 network_uri: Optional[str] = None,
+                 subnetwork_uri: Optional[str] = None,
+                 internal_ip_only: Optional[bool] = None,
+                 tags: Optional[List[str]] = None,
+                 storage_bucket: Optional[str] = None,
+                 init_actions_uris: Optional[List[str]] = None,
+                 init_action_timeout: str = "10m",
+                 metadata: Optional[Dict] = None,
+                 custom_image: Optional[str] = None,
+                 custom_image_project_id: Optional[str] = None,
+                 image_version: Optional[str] = None,
+                 autoscaling_policy: Optional[str] = None,
+                 properties: Optional[Dict] = None,
+                 optional_components: Optional[List[str]] = None,
+                 num_masters: int = 1,
+                 master_machine_type: str = 'n1-standard-4',
+                 master_disk_type: str = 'pd-standard',
+                 master_disk_size: int = 1024,
+                 worker_machine_type: str = 'n1-standard-4',
+                 worker_disk_type: str = 'pd-standard',
+                 worker_disk_size: int = 1024,
+                 num_preemptible_workers: int = 0,
+                 labels: Optional[Dict] = None,
+                 region: Optional[str] = None,
+                 service_account: Optional[str] = None,
+                 service_account_scopes: Optional[List[str]] = None,
+                 idle_delete_ttl: Optional[int] = None,
+                 auto_delete_time: Optional[datetime] = None,
+                 auto_delete_ttl: Optional[int] = None,
+                 customer_managed_key: Optional[str] = None,
+                 *args,  # just in case
+                 **kwargs
+                 ) -> None:
+
+        self.cluster_name = cluster_name
+        self.project_id = project_id
+        self.region = region
+        self.num_masters = num_masters
+        self.num_workers = num_workers
+        self.num_preemptible_workers = num_preemptible_workers
+        self.storage_bucket = storage_bucket
+        self.init_actions_uris = init_actions_uris
+        self.init_action_timeout = init_action_timeout
+        self.metadata = metadata
+        self.custom_image = custom_image
+        self.custom_image_project_id = custom_image_project_id
+        self.image_version = image_version
+        self.properties = properties or dict()
+        self.optional_components = optional_components
+        self.master_machine_type = master_machine_type
+        self.master_disk_type = master_disk_type
+        self.master_disk_size = master_disk_size
+        self.autoscaling_policy = autoscaling_policy
+        self.worker_machine_type = worker_machine_type
+        self.worker_disk_type = worker_disk_type
+        self.worker_disk_size = worker_disk_size
+        self.labels = labels
+        self.zone = zone
+        self.network_uri = network_uri
+        self.subnetwork_uri = subnetwork_uri
+        self.internal_ip_only = internal_ip_only
+        self.tags = tags
+        self.service_account = service_account
+        self.service_account_scopes = service_account_scopes
+        self.idle_delete_ttl = idle_delete_ttl
+        self.auto_delete_time = auto_delete_time
+        self.auto_delete_ttl = auto_delete_ttl
+        self.customer_managed_key = customer_managed_key
+        self.single_node = num_workers == 0
+
+        if self.custom_image and self.image_version:
+            raise ValueError("The custom_image and image_version can't be both set")
+
+        if self.single_node and self.num_preemptible_workers > 0:
+            raise ValueError("Single node cannot have preemptible workers.")
+
+    def _get_init_action_timeout(self):
+        match = re.match(r"^(\d+)([sm])$", self.init_action_timeout)
+        if match:
+            if match.group(2) == "s":
+                return self.init_action_timeout
+            elif match.group(2) == "m":
+                val = float(match.group(1))
+                return "{}s".format(int(timedelta(minutes=val).total_seconds()))
+
+        raise AirflowException(
+            "DataprocClusterCreateOperator init_action_timeout"
+            " should be expressed in minutes or seconds. i.e. 10m, 30s")
+
+    def _build_gce_cluster_config(self, cluster_data):
+        if self.zone:
+            zone_uri = \
+                'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
+                    self.project_id, self.zone
+                )
+            cluster_data['config']['gce_cluster_config']['zone_uri'] = zone_uri
+
+        if self.metadata:
+            cluster_data['config']['gce_cluster_config']['metadata'] = self.metadata
+
+        if self.network_uri:
+            cluster_data['config']['gce_cluster_config']['network_uri'] = self.network_uri
+
+        if self.subnetwork_uri:
+            cluster_data['config']['gce_cluster_config']['subnetwork_uri'] = \
+                self.subnetwork_uri
+
+        if self.internal_ip_only:
+            if not self.subnetwork_uri:
+                raise AirflowException("Set internal_ip_only to true only when"
+                                       " you pass a subnetwork_uri.")
+            cluster_data['config']['gce_cluster_config']['internal_ip_only'] = True
+
+        if self.tags:
+            cluster_data['config']['gce_cluster_config']['tags'] = self.tags
+
+        if self.service_account:
+            cluster_data['config']['gce_cluster_config']['service_account'] = \
+                self.service_account
+
+        if self.service_account_scopes:
+            cluster_data['config']['gce_cluster_config']['service_account_scopes'] = \
+                self.service_account_scopes
+
+        return cluster_data
+
+    def _build_lifecycle_config(self, cluster_data):
+        if self.idle_delete_ttl:
+            cluster_data['config']['lifecycle_config']['idle_delete_ttl'] = \
+                "{}s".format(self.idle_delete_ttl)
+
+        if self.auto_delete_time:
+            utc_auto_delete_time = timezone.convert_to_utc(self.auto_delete_time)
+            cluster_data['config']['lifecycle_config']['auto_delete_time'] = \
+                utc_auto_delete_time.format('%Y-%m-%dT%H:%M:%S.%fZ', formatter='classic')
+        elif self.auto_delete_ttl:
+            cluster_data['config']['lifecycle_config']['auto_delete_ttl'] = \
+                "{}s".format(self.auto_delete_ttl)
+
+        return cluster_data
+
+    def _build_cluster_data(self):
+        if self.zone:
+            master_type_uri = \
+                "https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}".format(
+                    self.project_id, self.zone, self.master_machine_type)
+            worker_type_uri = \
+                "https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}".format(
+                    self.project_id, self.zone, self.worker_machine_type)
+        else:
+            master_type_uri = self.master_machine_type
+            worker_type_uri = self.worker_machine_type
+
+        cluster_data = {
+            'project_id': self.project_id,
+            'cluster_name': self.cluster_name,
+            'config': {
+                'gce_cluster_config': {
+                },
+                'master_config': {
+                    'num_instances': self.num_masters,
+                    'machine_type_uri': master_type_uri,
+                    'disk_config': {
+                        'boot_disk_type': self.master_disk_type,
+                        'boot_disk_size_gb': self.master_disk_size
+                    }
+                },
+                'worker_config': {
+                    'num_instances': self.num_workers,
+                    'machine_type_uri': worker_type_uri,
+                    'disk_config': {
+                        'boot_disk_type': self.worker_disk_type,
+                        'boot_disk_size_gb': self.worker_disk_size
+                    }
+                },
+                'secondary_worker_config': {},
+                'software_config': {},
+                'lifecycle_config': {},
+                'encryption_config': {},
+                'autoscaling_config': {},
+            }
+        }
+        if self.num_preemptible_workers > 0:
+            cluster_data['config']['secondary_worker_config'] = {
+                'num_instances': self.num_preemptible_workers,
+                'machine_type_uri': worker_type_uri,
+                'disk_config': {
+                    'boot_disk_type': self.worker_disk_type,
+                    'boot_disk_size_gb': self.worker_disk_size
+                },
+                'is_preemptible': True
+            }
+
+        cluster_data['labels'] = self.labels or {}
+
+        # Dataproc labels must conform to the following regex:
+        # [a-z]([-a-z0-9]*[a-z0-9])? (current airflow version string follows
+        # semantic versioning spec: x.y.z).
+        cluster_data['labels'].update({
+            'airflow-version': 'v' + airflow_version.replace('.', '-').replace('+', '-')
+        })
+        if self.storage_bucket:
+            cluster_data['config']['config_bucket'] = self.storage_bucket
+
+        if self.image_version:
+            cluster_data['config']['software_config']['image_version'] = self.image_version
+
+        elif self.custom_image:
+            project_id = self.custom_image_project_id or self.project_id
+            custom_image_url = 'https://www.googleapis.com/compute/beta/projects/' \
+                               '{}/global/images/{}'.format(project_id,
+                                                            self.custom_image)
+            cluster_data['config']['master_config']['image_uri'] = custom_image_url
+            if not self.single_node:
+                cluster_data['config']['worker_config']['image_uri'] = custom_image_url
+
+        cluster_data = self._build_gce_cluster_config(cluster_data)
+
+        if self.single_node:
+            self.properties["dataproc:dataproc.allow.zero.workers"] = "true"
+
+        if self.properties:
+            cluster_data['config']['software_config']['properties'] = self.properties
+
+        if self.optional_components:
+            cluster_data['config']['software_config']['optional_components'] = self.optional_components
+
+        cluster_data = self._build_lifecycle_config(cluster_data)
+
+        if self.init_actions_uris:
+            init_actions_dict = [
+                {
+                    'executable_file': uri,
+                    'execution_timeout': self._get_init_action_timeout()
+                } for uri in self.init_actions_uris
+            ]
+            cluster_data['config']['initialization_actions'] = init_actions_dict
+
+        if self.customer_managed_key:
+            cluster_data['config']['encryption_config'] = \
+                {'gce_pd_kms_key_name': self.customer_managed_key}
+        if self.autoscaling_policy:
+            cluster_data['config']['autoscaling_config'] = {'policy_uri': self.autoscaling_policy}
+
+        return cluster_data
+
+    def make(self):
+        """
+        Helper method for easier migration.
+        :return: Dict representing Dataproc cluster.
+        """
+        return self._build_cluster_data()
+
+
+# pylint: disable=too-many-instance-attributes
+class DataprocCreateClusterOperator(BaseOperator):
+    """
+    Create a new cluster on Google Cloud Dataproc. The operator will wait until the
+    creation is successful or an error occurs in the creation process.
+
+    The parameters allow to configure the cluster. Please refer to
+
+    https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters
+
+    for a detailed explanation on the different parameters. Most of the configuration
+    parameters detailed in the link are available as a parameter to this operator.
+
+    :param project_id: The ID of the google cloud project in which
+        to create the cluster. (templated)
+    :type project_id: str
+    :param region: leave as 'global', might become relevant in the future. (templated)
+    :type region: str
+    :param request_id: Optional. A unique id used to identify the request. If the server receives two
+        ``DeleteClusterRequest`` requests with the same id, then the second request will be ignored and the
+        first ``google.longrunning.Operation`` created and stored in the backend is returned.
+    :type request_id: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+    :type gcp_conn_id: str
+    """
+
+    template_fields = ('project_id', 'region', 'cluster')
+
+    @apply_defaults
+    def __init__(self,
+                 region: str = 'global',
+                 project_id: Optional[str] = None,
+                 cluster: Optional[Dict] = None,
+                 request_id: Optional[str] = None,
+                 retry: Optional[Retry] = None,
+                 timeout: Optional[float] = None,
+                 metadata: Optional[Sequence[Tuple[str, str]]] = None,
+                 gcp_conn_id: str = "google_cloud_default",
+                 *args,
+                 **kwargs) -> None:
+        # TODO: remove one day
+        if cluster is None:
+            warnings.warn(
+                "Passing cluster parameters by keywords to `{}` "
+                "will be deprecated. Please provide cluster object using `cluster` parameter. "
+                "You can use `airflow.dataproc.ClusterGenerator.generate_cluster` method to "
+                "obtain cluster object.".format(type(self).__name__),
+                DeprecationWarning, stacklevel=1
+            )
+            # Remove result of apply defaults
+            if 'params' in kwargs:
+                del kwargs['params']
+
+            # Create cluster object from kwargs
+            kwargs['region'] = region
+            kwargs['project_id'] = project_id
+            cluster = ClusterGenerator(**kwargs).make()
+
+            # Remove from kwargs cluster params passed for backward compatibility
+            cluster_params = inspect.signature(ClusterGenerator.__init__).parameters
+            for arg in cluster_params:
+                if arg in kwargs:
+                    del kwargs[arg]
+
+        super().__init__(*args, **kwargs)
+
+        self.cluster = cluster
+        self.cluster_name = cluster.get('cluster_name')
+        self.project_id = project_id
+        self.region = region
+        self.request_id = request_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+
+    def execute(self, context):
+        self.log.info('Creating cluster: %s', self.cluster_name)
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
+        try:
+            operation = hook.create_cluster(
+                project_id=self.project_id,
+                region=self.region,
+                cluster=self.cluster,
+                request_id=self.request_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            cluster = operation.result()
+            self.log.info("Cluster created.")
+        except AlreadyExists:
+            cluster = hook.get_cluster(
+                project_id=self.project_id,
+                region=self.region,
+                cluster_name=self.cluster_name,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            self.log.info("Cluster already exists.")
+        return MessageToDict(cluster)
+
+
+class DataprocScaleClusterOperator(BaseOperator):
+    """
+    Scale, up or down, a cluster on Google Cloud Dataproc.
+    The operator will wait until the cluster is re-scaled.
+
+    **Example**: ::
+
+        t1 = DataprocClusterScaleOperator(
+                task_id='dataproc_scale',
+                project_id='my-project',
+                cluster_name='cluster-1',
+                num_workers=10,
+                num_preemptible_workers=10,
+                graceful_decommission_timeout='1h',
+                dag=dag)
+
+    .. seealso::
+        For more detail on about scaling clusters have a look at the reference:
+        https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/scaling-clusters
+
+    :param cluster_name: The name of the cluster to scale. (templated)
+    :type cluster_name: str
+    :param project_id: The ID of the google cloud project in which
+        the cluster runs. (templated)
+    :type project_id: str
+    :param region: The region for the dataproc cluster. (templated)
+    :type region: str
+    :param num_workers: The new number of workers
+    :type num_workers: int
+    :param num_preemptible_workers: The new number of preemptible workers
+    :type num_preemptible_workers: int
+    :param graceful_decommission_timeout: Timeout for graceful YARN decomissioning.
+        Maximum value is 1d
+    :type graceful_decommission_timeout: str
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+    :type gcp_conn_id: str
+    """
+
+    template_fields = ['cluster_name', 'project_id', 'region']
+
+    @apply_defaults
+    def __init__(self,
+                 cluster_name: str,
+                 project_id: Optional[str] = None,
+                 region: str = 'global',
+                 num_workers: int = 2,
+                 num_preemptible_workers: int = 0,
+                 graceful_decommission_timeout: Optional[str] = None,
+                 gcp_conn_id: str = "google_cloud_default",
+                 *args,
+                 **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.cluster_name = cluster_name
+        self.num_workers = num_workers
+        self.num_preemptible_workers = num_preemptible_workers
+        self.graceful_decommission_timeout = graceful_decommission_timeout
+        self.gcp_conn_id = gcp_conn_id
+
+        # TODO: Remove one day
+        warnings.warn(
+            "The `{cls}` operator is deprecated, please use `DataprocUpdateClusterOperator` instead.".format(
+                cls=type(self).__name__
+            ),
+            DeprecationWarning,
+            stacklevel=1
+        )
+
+    def _build_scale_cluster_data(self):
+        scale_data = {
+            'config': {
+                'worker_config': {
+                    'num_instances': self.num_workers
+                },
+                'secondary_worker_config': {
+                    'num_instances': self.num_preemptible_workers
+                }
+            }
+        }
+        return scale_data
+
+    @property
+    def _graceful_decommission_timeout_object(self) -> Optional[Dict]:
+        if not self.graceful_decommission_timeout:
+            return None
+
+        timeout = None
+        match = re.match(r"^(\d+)([smdh])$", self.graceful_decommission_timeout)
+        if match:
+            if match.group(2) == "s":
+                timeout = int(match.group(1))
+            elif match.group(2) == "m":
+                val = float(match.group(1))
+                timeout = int(timedelta(minutes=val).total_seconds())
+            elif match.group(2) == "h":
+                val = float(match.group(1))
+                timeout = int(timedelta(hours=val).total_seconds())
+            elif match.group(2) == "d":
+                val = float(match.group(1))
+                timeout = int(timedelta(days=val).total_seconds())
+
+        if not timeout:
+            raise AirflowException(
+                "DataprocClusterScaleOperator "
+                " should be expressed in day, hours, minutes or seconds. "
+                " i.e. 1d, 4h, 10m, 30s"
+            )
+
+        return {'seconds': timeout}
+
+    def execute(self, context):
+        """
+        Scale, up or down, a cluster on Google Cloud Dataproc.
+        """
+        self.log.info("Scaling cluster: %s", self.cluster_name)
+
+        scaling_cluster_data = self._build_scale_cluster_data()
+        update_mask = [
+            "config.worker_config.num_instances",
+            "config.secondary_worker_config.num_instances"
+        ]
+
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
+        operation = hook.update_cluster(
+            project_id=self.project_id,
+            location=self.region,
+            cluster_name=self.cluster_name,
+            cluster=scaling_cluster_data,
+            graceful_decommission_timeout=self._graceful_decommission_timeout_object,
+            update_mask={'paths': update_mask},
+        )
+        operation.result()
+        self.log.info("Cluster scaling finished")
+
+
+class DataprocDeleteClusterOperator(BaseOperator):
+    """
+    Deletes a cluster in a project.
+
+    :param project_id: Required. The ID of the Google Cloud Platform project that the cluster belongs to.
+    :type project_id: str
+    :param region: Required. The Cloud Dataproc region in which to handle the request.
+    :type region: str
+    :param cluster_name: Required. The cluster name.
+    :type cluster_name: str
+    :param cluster_uuid: Optional. Specifying the ``cluster_uuid`` means the RPC should fail
+        if cluster with specified UUID does not exist.
+    :type cluster_uuid: str
+    :param request_id: Optional. A unique id used to identify the request. If the server receives two
+        ``DeleteClusterRequest`` requests with the same id, then the second request will be ignored and the
+        first ``google.longrunning.Operation`` created and stored in the backend is returned.
+    :type request_id: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+    :type gcp_conn_id: str
+    """
+
+    @apply_defaults
+    def __init__(
+        self,
+        project_id: str,
+        region: str,
+        cluster_name: str,
+        cluster_uuid: Optional[str] = None,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        *args,
+        **kwargs
+    ):
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.cluster_name = cluster_name
+        self.cluster_uuid = cluster_uuid
+        self.request_id = request_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+
+    def execute(self, context: Dict):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
+        self.log.info("Deleting cluster: %s", self.cluster_name)
+        operation = hook.delete_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster_name=self.cluster_name,
+            cluster_uuid=self.cluster_uuid,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        operation.result()
+        self.log.info("Cluster deleted.")
+
+
+class DataprocJobBaseOperator(BaseOperator):
+    """
+    The base class for operators that launch job on DataProc.
+
+    :param job_name: The job name used in the DataProc cluster. This name by default
+        is the task_id appended with the execution data, but can be templated. The
+        name will always be appended with a random number to avoid name clashes.
+    :type job_name: str
+    :param cluster_name: The name of the DataProc cluster.
+    :type cluster_name: str
+    :param dataproc_properties: Map for the Hive properties. Ideal to put in
+        default arguments (templated)
+    :type dataproc_properties: dict
+    :param dataproc_jars: HCFS URIs of jar files to add to the CLASSPATH of the Hive server and Hadoop
+        MapReduce (MR) tasks. Can contain Hive SerDes and UDFs. (templated)
+    :type dataproc_jars: list
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param labels: The labels to associate with this job. Label keys must contain 1 to 63 characters,
+        and must conform to RFC 1035. Label values may be empty, but, if present, must contain 1 to 63
+        characters, and must conform to RFC 1035. No more than 32 labels can be associated with a job.
+    :type labels: dict
+    :param region: The specified region where the dataproc cluster is created.
+    :type region: str
+    :param job_error_states: Job states that should be considered error states.
+        Any states in this set will result in an error being raised and failure of the
+        task. Eg, if the ``CANCELLED`` state should also be considered a task failure,
+        pass in ``{'ERROR', 'CANCELLED'}``. Possible values are currently only
+        ``'ERROR'`` and ``'CANCELLED'``, but could change in the future. Defaults to
+        ``{'ERROR'}``.
+    :type job_error_states: set
+    :var dataproc_job_id: The actual "jobId" as submitted to the Dataproc API.
+        This is useful for identifying or linking to the job in the Google Cloud Console
+        Dataproc UI, as the actual "jobId" submitted to the Dataproc API is appended with
+        an 8 character random string.
+    :vartype dataproc_job_id: str
+    """
+    job_type = ""
+
+    @apply_defaults
+    def __init__(self,
+                 job_name: str = '{{task.task_id}}_{{ds_nodash}}',
+                 cluster_name: str = "cluster-1",
+                 dataproc_properties: Optional[Dict] = None,
+                 dataproc_jars: Optional[List[str]] = None,
+                 gcp_conn_id: str = 'google_cloud_default',
+                 delegate_to: Optional[str] = None,
+                 labels: Optional[Dict] = None,
+                 region: str = 'global',
+                 job_error_states: Optional[Set[str]] = None,
+                 *args,
+                 **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.labels = labels
+        self.job_name = job_name
+        self.cluster_name = cluster_name
+        self.dataproc_properties = dataproc_properties
+        self.dataproc_jars = dataproc_jars
+        self.region = region
+        self.job_error_states = job_error_states if job_error_states is not None else {'ERROR'}
+
+        self.hook = DataprocHook(gcp_conn_id=gcp_conn_id)
+        self.project_id = self.hook.project_id
+        self.job_template = None
+        self.job = None
+        self.dataproc_job_id = None
+
+    def create_job_template(self):
+        """
+        Initialize `self.job_template` with default values
+        """
+        self.job_template = DataProcJobBuilder(
+            project_id=self.project_id,
+            task_id=self.task_id,
+            cluster_name=self.cluster_name,
+            job_type=self.job_type,
+            properties=self.dataproc_properties
+        )
+        self.job_template.set_job_name(self.job_name)
+        self.job_template.add_jar_file_uris(self.dataproc_jars)
+        self.job_template.add_labels(self.labels)
+
+    def _generate_job_template(self):
+        if self.job_template:
+            job = self.job_template.build()
+            return job['job']
+        raise Exception("Create a job template before")
+
+    def execute(self, context):
+        if self.job_template:
+            self.job = self.job_template.build()
+            self.dataproc_job_id = self.job["job"]["reference"]["job_id"]
+            self.log.info('Submitting %s job %s', self.job_type, self.dataproc_job_id)
+            job_object = self.hook.submit_job(
+                project_id=self.project_id,
+                job=self.job["job"],
+                location=self.region,
+            )
+            job_id = job_object.reference.job_id
+            self.hook.wait_for_job(
+                job_id=job_id,
+                location=self.region,
+                project_id=self.project_id
+            )
+            self.log.info('Job executed correctly.')
+        else:
+            raise AirflowException("Create a job template before")
+
+    def on_kill(self):
+        """
+        Callback called when the operator is killed.
+        Cancel any running job.
+        """
+        if self.dataproc_job_id:
+            self.hook.cancel_job(
+                project_id=self.project_id,
+                job_id=self.dataproc_job_id,
+                location=self.region
+            )
+
+
+class DataprocSubmitPigJobOperator(DataprocJobBaseOperator):
+    """
+    Start a Pig query Job on a Cloud DataProc cluster. The parameters of the operation
+    will be passed to the cluster.
+
+    It's a good practice to define dataproc_* parameters in the default_args of the dag
+    like the cluster name and UDFs.
+
+    .. code-block:: python
+
+        default_args = {
+            'cluster_name': 'cluster-1',
+            'dataproc_pig_jars': [
+                'gs://example/udf/jar/datafu/1.2.0/datafu.jar',
+                'gs://example/udf/jar/gpig/1.2/gpig.jar'
+            ]
+        }
+
+    You can pass a pig script as string or file reference. Use variables to pass on
+    variables for the pig script to be resolved on the cluster or use the parameters to
+    be resolved in the script as template parameters.
+
+    **Example**: ::
+
+        t1 = DataProcPigOperator(
+                task_id='dataproc_pig',
+                query='a_pig_script.pig',
+                variables={'out': 'gs://example/output/{{ds}}'},
+                dag=dag)
+
+    .. seealso::
+        For more detail on about job submission have a look at the reference:
+        https://cloud.google.com/dataproc/reference/rest/v1/projects.regions.jobs
+
+    :param query: The query or reference to the query
+        file (pg or pig extension). (templated)
+    :type query: str
+    :param query_uri: The HCFS URI of the script that contains the Pig queries.
+    :type query_uri: str
+    :param variables: Map of named parameters for the query. (templated)
+    :type variables: dict
+    """
+    template_fields = ['query', 'variables', 'job_name', 'cluster_name',
+                       'region', 'dataproc_jars', 'dataproc_properties']
+    template_ext = ('.pg', '.pig',)
+    ui_color = '#0273d4'
+    job_type = 'pig_job'
+
+    @apply_defaults
+    def __init__(
+        self,
+        query: Optional[str] = None,
+        query_uri: Optional[str] = None,
+        variables: Optional[Dict] = None,
+        *args,
+        **kwargs
+    ) -> None:
+        # TODO: Remove one day
+        warnings.warn(
+            "The `{cls}` operator is deprecated, please use `DataprocSubmitJobOperator` instead. You can use"
+            " `generate_job` method of `{cls}` to generate dictionary representing your job"
+            " and use it with the new operator.".format(cls=type(self).__name__),
+            DeprecationWarning,
+            stacklevel=1
+        )
+
+        super().__init__(*args, **kwargs)
+        self.query = query
+        self.query_uri = query_uri
+        self.variables = variables
+
+    def generate_job(self):
+        """
+        Helper method for easier migration to `DataprocSubmitJobOperator`.
+        :return: Dict representing Dataproc job
+        """
+        self.create_job_template()
+
+        if self.query is None:
+            self.job_template.add_query_uri(self.query_uri)
+        else:
+            self.job_template.add_query(self.query)
+        self.job_template.add_variables(self.variables)
+        return self._generate_job_template()
+
+    def execute(self, context):
+        self.create_job_template()
+
+        if self.query is None:
+            self.job_template.add_query_uri(self.query_uri)
+        else:
+            self.job_template.add_query(self.query)
+        self.job_template.add_variables(self.variables)
+
+        super().execute(context)
+
+
+class DataprocSubmitHiveJobOperator(DataprocJobBaseOperator):
+    """
+    Start a Hive query Job on a Cloud DataProc cluster.
+
+    :param query: The query or reference to the query file (q extension).
+    :type query: str
+    :param query_uri: The HCFS URI of the script that contains the Hive queries.
+    :type query_uri: str
+    :param variables: Map of named parameters for the query.
+    :type variables: dict
+    """
+    template_fields = ['query', 'variables', 'job_name', 'cluster_name',
+                       'region', 'dataproc_jars', 'dataproc_properties']
+    template_ext = ('.q', '.hql',)
+    ui_color = '#0273d4'
+    job_type = 'hive_job'
+
+    @apply_defaults
+    def __init__(
+        self,
+        query: Optional[str] = None,
+        query_uri: Optional[str] = None,
+        variables: Optional[Dict] = None,
+        *args,
+        **kwargs
+    ) -> None:
+        # TODO: Remove one day
+        warnings.warn(
+            "The `{cls}` operator is deprecated, please use `DataprocSubmitJobOperator` instead. You can use"
+            " `generate_job` method of `{cls}` to generate dictionary representing your job"
+            " and use it with the new operator.".format(cls=type(self).__name__),
+            DeprecationWarning,
+            stacklevel=1
+        )
+
+        super().__init__(*args, **kwargs)
+        self.query = query
+        self.query_uri = query_uri
+        self.variables = variables
+        if self.query is not None and self.query_uri is not None:
+            raise AirflowException('Only one of `query` and `query_uri` can be passed.')
+
+    def generate_job(self):
+        """
+        Helper method for easier migration to `DataprocSubmitJobOperator`.
+        :return: Dict representing Dataproc job
+        """
+        self.create_job_template()
+        if self.query is None:
+            self.job_template.add_query_uri(self.query_uri)
+        else:
+            self.job_template.add_query(self.query)
+        self.job_template.add_variables(self.variables)
+        return self._generate_job_template()
+
+    def execute(self, context):
+        self.create_job_template()
+        if self.query is None:
+            self.job_template.add_query_uri(self.query_uri)
+        else:
+            self.job_template.add_query(self.query)
+        self.job_template.add_variables(self.variables)
+
+        super().execute(context)
+
+
+class DataprocSubmitSparkSqlJobOperator(DataprocJobBaseOperator):
+    """
+    Start a Spark SQL query Job on a Cloud DataProc cluster.
+
+    :param query: The query or reference to the query file (q extension). (templated)
+    :type query: str
+    :param query_uri: The HCFS URI of the script that contains the SQL queries.
+    :type query_uri: str
+    :param variables: Map of named parameters for the query. (templated)
+    :type variables: dict
+    """
+    template_fields = ['query', 'variables', 'job_name', 'cluster_name',
+                       'region', 'dataproc_jars', 'dataproc_properties']
+    template_ext = ('.q',)
+    ui_color = '#0273d4'
+    job_type = 'spark_sql_job'
+
+    @apply_defaults
+    def __init__(
+        self,
+        query: Optional[str] = None,
+        query_uri: Optional[str] = None,
+        variables: Optional[Dict] = None,
+        *args,
+        **kwargs
+    ) -> None:
+        # TODO: Remove one day
+        warnings.warn(
+            "The `{cls}` operator is deprecated, please use `DataprocSubmitJobOperator` instead. You can use"
+            " `generate_job` method of `{cls}` to generate dictionary representing your job"
+            " and use it with the new operator.".format(cls=type(self).__name__),
+            DeprecationWarning,
+            stacklevel=1
+        )
+
+        super().__init__(*args, **kwargs)
+        self.query = query
+        self.query_uri = query_uri
+        self.variables = variables
+        if self.query is not None and self.query_uri is not None:
+            raise AirflowException('Only one of `query` and `query_uri` can be passed.')
+
+    def generate_job(self):
+        """
+        Helper method for easier migration to `DataprocSubmitJobOperator`.
+        :return: Dict representing Dataproc job
+        """
+        self.create_job_template()
+        if self.query is None:
+            self.job_template.add_query_uri(self.query_uri)
+        else:
+            self.job_template.add_query(self.query)
+        self.job_template.add_variables(self.variables)
+        return self._generate_job_template()
+
+    def execute(self, context):
+        self.create_job_template()
+        if self.query is None:
+            self.job_template.add_query_uri(self.query_uri)
+        else:
+            self.job_template.add_query(self.query)
+        self.job_template.add_variables(self.variables)
+
+        super().execute(context)
+
+
+class DataprocSubmitSparkJobOperator(DataprocJobBaseOperator):
+    """
+    Start a Spark Job on a Cloud DataProc cluster.
+
+    :param main_jar: The HCFS URI of the jar file that contains the main class
+        (use this or the main_class, not both together).
+    :type main_jar: str
+    :param main_class: Name of the job class. (use this or the main_jar, not both
+        together).
+    :type main_class: str
+    :param arguments: Arguments for the job. (templated)
+    :type arguments: list
+    :param archives: List of archived files that will be unpacked in the work
+        directory. Should be stored in Cloud Storage.
+    :type archives: list
+    :param files: List of files to be copied to the working directory
+    :type files: list
+    """
+
+    template_fields = ['arguments', 'job_name', 'cluster_name',
+                       'region', 'dataproc_jars', 'dataproc_properties']
+    ui_color = '#0273d4'
+    job_type = 'spark_job'
+
+    @apply_defaults
+    def __init__(
+        self,
+        main_jar: Optional[str] = None,
+        main_class: Optional[str] = None,
+        arguments: Optional[List] = None,
+        archives: Optional[List] = None,
+        files: Optional[List] = None,
+        *args,
+        **kwargs
+    ) -> None:
+        # TODO: Remove one day
+        warnings.warn(
+            "The `{cls}` operator is deprecated, please use `DataprocSubmitJobOperator` instead. You can use"
+            " `generate_job` method of `{cls}` to generate dictionary representing your job"
+            " and use it with the new operator.".format(cls=type(self).__name__),
+            DeprecationWarning,
+            stacklevel=1
+        )
+
+        super().__init__(*args, **kwargs)
+        self.main_jar = main_jar
+        self.main_class = main_class
+        self.arguments = arguments
+        self.archives = archives
+        self.files = files
+
+    def generate_job(self):
+        """
+        Helper method for easier migration to `DataprocSubmitJobOperator`.
+        :return: Dict representing Dataproc job
+        """
+        self.create_job_template()
+        self.job_template.set_main(self.main_jar, self.main_class)
+        self.job_template.add_args(self.arguments)
+        self.job_template.add_archive_uris(self.archives)
+        self.job_template.add_file_uris(self.files)
+        return self._generate_job_template()
+
+    def execute(self, context):
+        self.create_job_template()
+        self.job_template.set_main(self.main_jar, self.main_class)
+        self.job_template.add_args(self.arguments)
+        self.job_template.add_archive_uris(self.archives)
+        self.job_template.add_file_uris(self.files)
+
+        super().execute(context)
+
+
+class DataprocSubmitHadoopJobOperator(DataprocJobBaseOperator):
+    """
+    Start a Hadoop Job on a Cloud DataProc cluster.
+
+    :param main_jar: The HCFS URI of the jar file containing the main class
+        (use this or the main_class, not both together).
+    :type main_jar: str
+    :param main_class: Name of the job class. (use this or the main_jar, not both
+        together).
+    :type main_class: str
+    :param arguments: Arguments for the job. (templated)
+    :type arguments: list
+    :param archives: List of archived files that will be unpacked in the work
+        directory. Should be stored in Cloud Storage.
+    :type archives: list
+    :param files: List of files to be copied to the working directory
+    :type files: list
+    """
+
+    template_fields = ['arguments', 'job_name', 'cluster_name',
+                       'region', 'dataproc_jars', 'dataproc_properties']
+    ui_color = '#0273d4'
+    job_type = 'hadoop_job'
+
+    @apply_defaults
+    def __init__(
+        self,
+        main_jar: Optional[str] = None,
+        main_class: Optional[str] = None,
+        arguments: Optional[List] = None,
+        archives: Optional[List] = None,
+        files: Optional[List] = None,
+        *args,
+        **kwargs
+    ) -> None:
+        # TODO: Remove one day
+        warnings.warn(
+            "The `{cls}` operator is deprecated, please use `DataprocSubmitJobOperator` instead. You can use"
+            " `generate_job` method of `{cls}` to generate dictionary representing your job"
+            " and use it with the new operator.".format(cls=type(self).__name__),
+            DeprecationWarning,
+            stacklevel=1
+        )
+
+        super().__init__(*args, **kwargs)
+        self.main_jar = main_jar
+        self.main_class = main_class
+        self.arguments = arguments
+        self.archives = archives
+        self.files = files
+
+    def generate_job(self):
+        """
+        Helper method for easier migration to `DataprocSubmitJobOperator`.
+        :return: Dict representing Dataproc job
+        """
+        self.create_job_template()
+        self.job_template.set_main(self.main_jar, self.main_class)
+        self.job_template.add_args(self.arguments)
+        self.job_template.add_archive_uris(self.archives)
+        self.job_template.add_file_uris(self.files)
+        return self._generate_job_template()
+
+    def execute(self, context):
+        self.create_job_template()
+        self.job_template.set_main(self.main_jar, self.main_class)
+        self.job_template.add_args(self.arguments)
+        self.job_template.add_archive_uris(self.archives)
+        self.job_template.add_file_uris(self.files)
+
+        super().execute(context)
+
+
+class DataprocSubmitPySparkJobOperator(DataprocJobBaseOperator):
+    """
+    Start a PySpark Job on a Cloud DataProc cluster.
+
+    :param main: [Required] The Hadoop Compatible Filesystem (HCFS) URI of the main
+            Python file to use as the driver. Must be a .py file. (templated)
+    :type main: str
+    :param arguments: Arguments for the job. (templated)
+    :type arguments: list
+    :param archives: List of archived files that will be unpacked in the work
+        directory. Should be stored in Cloud Storage.
+    :type archives: list
+    :param files: List of files to be copied to the working directory
+    :type files: list
+    :param pyfiles: List of Python files to pass to the PySpark framework.
+        Supported file types: .py, .egg, and .zip
+    :type pyfiles: list
+    """
+
+    template_fields = ['main', 'arguments', 'job_name', 'cluster_name',
+                       'region', 'dataproc_jars', 'dataproc_properties']
+    ui_color = '#0273d4'
+    job_type = 'pyspark_job'
+
+    @staticmethod
+    def _generate_temp_filename(filename):
+        date = time.strftime('%Y%m%d%H%M%S')
+        return "{}_{}_{}".format(date, str(uuid.uuid4())[:8], ntpath.basename(filename))
+
+    def _upload_file_temp(self, bucket, local_file):
+        """
+        Upload a local file to a Google Cloud Storage bucket.
+        """
+        temp_filename = self._generate_temp_filename(local_file)
+        if not bucket:
+            raise AirflowException(
+                "If you want Airflow to upload the local file to a temporary bucket, set "
+                "the 'temp_bucket' key in the connection string")
+
+        self.log.info("Uploading %s to %s", local_file, temp_filename)
+
+        GCSHook(
+            google_cloud_storage_conn_id=self.gcp_conn_id
+        ).upload(
+            bucket_name=bucket,
+            object_name=temp_filename,
+            mime_type='application/x-python',
+            filename=local_file
+        )
+        return "gs://{}/{}".format(bucket, temp_filename)
+
+    @apply_defaults
+    def __init__(
+        self,
+        main: str,
+        arguments: Optional[List] = None,
+        archives: Optional[List] = None,
+        pyfiles: Optional[List] = None,
+        files: Optional[List] = None,
+        *args,
+        **kwargs
+    ) -> None:
+        # TODO: Remove one day
+        warnings.warn(
+            "The `{cls}` operator is deprecated, please use `DataprocSubmitJobOperator` instead. You can use"
+            " `generate_job` method of `{cls}` to generate dictionary representing your job"
+            " and use it with the new operator.".format(cls=type(self).__name__),
+            DeprecationWarning,
+            stacklevel=1
+        )
+
+        super().__init__(*args, **kwargs)
+        self.main = main
+        self.arguments = arguments
+        self.archives = archives
+        self.files = files
+        self.pyfiles = pyfiles
+
+    def generate_job(self):
+        """
+        Helper method for easier migration to `DataprocSubmitJobOperator`.
+        :return: Dict representing Dataproc job
+        """
+        self.create_job_template()
+        #  Check if the file is local, if that is the case, upload it to a bucket
+        if os.path.isfile(self.main):
+            cluster_info = self.hook.get_cluster(
+                project_id=self.hook.project_id,
+                region=self.region,
+                cluster_name=self.cluster_name
+            )
+            bucket = cluster_info['config']['config_bucket']
+            self.main = "gs://{}/{}".format(bucket, self.main)
+        self.job_template.set_python_main(self.main)
+        self.job_template.add_args(self.arguments)
+        self.job_template.add_archive_uris(self.archives)
+        self.job_template.add_file_uris(self.files)
+        self.job_template.add_python_file_uris(self.pyfiles)
+
+        return self._generate_job_template()
+
+    def execute(self, context):
+        self.create_job_template()
+        #  Check if the file is local, if that is the case, upload it to a bucket
+        if os.path.isfile(self.main):
+            cluster_info = self.hook.get_cluster(
+                project_id=self.hook.project_id,
+                region=self.region,
+                cluster_name=self.cluster_name
+            )
+            bucket = cluster_info['config']['config_bucket']
+            self.main = self._upload_file_temp(bucket, self.main)
+
+        self.job_template.set_python_main(self.main)
+        self.job_template.add_args(self.arguments)
+        self.job_template.add_archive_uris(self.archives)
+        self.job_template.add_file_uris(self.files)
+        self.job_template.add_python_file_uris(self.pyfiles)
+
+        super().execute(context)
+
+
+class DataprocInstantiateWorkflowTemplateOperator(BaseOperator):
+    """
+    Instantiate a WorkflowTemplate on Google Cloud Dataproc. The operator will wait
+    until the WorkflowTemplate is finished executing.
+
+    .. seealso::
+        Please refer to:
+        https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiate
+
+    :param template_id: The id of the template. (templated)
+    :type template_id: str
+    :param project_id: The ID of the google cloud project in which
+        the template runs
+    :type project_id: str
+    :param region: leave as 'global', might become relevant in the future
+    :type region: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param parameters: a map of parameters for Dataproc Template in key-value format:
+        map (key: string, value: string)
+        Example: { "date_from": "2019-08-01", "date_to": "2019-08-02"}.
+        Values may not exceed 100 characters. Please refer to:
+        https://cloud.google.com/dataproc/docs/concepts/workflows/workflow-parameters
+    :type parameters: Dict[str, str]
+    :param request_id: Optional. A unique id used to identify the request. If the server receives two
+        ``SubmitJobRequest`` requests with the same id, then the second request will be ignored and the first
+        ``Job`` created and stored in the backend is returned.
+        It is recommended to always set this value to a UUID.
+    :type request_id: str
+    :param parameters: Optional. Map from parameter names to values that should be used for those
+        parameters. Values may not exceed 100 characters.
+    :type parameters: Dict[str, str]
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+    :type gcp_conn_id: str
+    """
+
+    template_fields = ['template_id']
+
+    @apply_defaults
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        template_id: str,
+        region: str,
+        project_id: Optional[str] = None,
+        version: Optional[int] = None,
+        request_id: Optional[str] = None,
+        parameters: Optional[Dict[str, str]] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        *args,
+        **kwargs
+    ) -> None:
+        super().__init__(*args, **kwargs)
+
+        self.template_id = template_id
+        self.parameters = parameters
+        self.version = version
+        self.project_id = project_id
+        self.region = region
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.request_id = request_id
+        self.gcp_conn_id = gcp_conn_id
+
+    def execute(self, context):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
+        self.log.info('Instantiating template %s', self.template_id)
+        operation = hook.instantiate_workflow_template(
+            project_id=self.project_id,
+            location=self.region,
+            template_name=self.template_id,
+            version=self.version,
+            request_id=self.request_id,
+            parameters=self.parameters,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        operation.result()
+        self.log.info('Template instantiated.')
+
+
+class DataprocInstantiateInlineWorkflowTemplateOperator(BaseOperator):
+    """
+    Instantiate a WorkflowTemplate Inline on Google Cloud Dataproc. The operator will
+    wait until the WorkflowTemplate is finished executing.
+
+    .. seealso::
+        Please refer to:
+        https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiateInline
+
+    :param template: The template contents. (templated)
+    :type template: dict
+    :param project_id: The ID of the google cloud project in which
+        the template runs
+    :type project_id: str
+    :param region: leave as 'global', might become relevant in the future
+    :type region: str
+    :param parameters: a map of parameters for Dataproc Template in key-value format:
+        map (key: string, value: string)
+        Example: { "date_from": "2019-08-01", "date_to": "2019-08-02"}.
+        Values may not exceed 100 characters. Please refer to:
+        https://cloud.google.com/dataproc/docs/concepts/workflows/workflow-parameters
+    :type parameters: Dict[str, str]
+    :param request_id: Optional. A unique id used to identify the request. If the server receives two
+        ``SubmitJobRequest`` requests with the same id, then the second request will be ignored and the first
+        ``Job`` created and stored in the backend is returned.
+        It is recommended to always set this value to a UUID.
+    :type request_id: str
+    :param parameters: Optional. Map from parameter names to values that should be used for those
+        parameters. Values may not exceed 100 characters.
+    :type parameters: Dict[str, str]
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+    :type gcp_conn_id: str
+    """
+
+    template_fields = ['template']
+
+    @apply_defaults
+    def __init__(
+        self,
+        template: Dict,
+        region: str,
+        project_id: Optional[str] = None,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        *args,
+        **kwargs
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.template = template
+        self.project_id = project_id
+        self.location = region
+        self.template = template
+        self.request_id = request_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+
+    def execute(self, context):
+        self.log.info('Instantiating Inline Template')
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
+        operation = hook.instantiate_inline_workflow_template(
+            template=self.template,
+            project_id=self.project_id,
+            location=self.location,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        operation.result()
+        self.log.info('Template instantiated.')
+
+
+class DataprocSubmitJobOperator(BaseOperator):
+    """
+    Submits a job to a cluster.
+
+    :param project_id: Required. The ID of the Google Cloud Platform project that the job belongs to.
+    :type project_id: str
+    :param location: Required. The Cloud Dataproc region in which to handle the request.
+    :type location: str
+    :param job: Required. The job resource.
+        If a dict is provided, it must be of the same form as the protobuf message
+        :class:`~google.cloud.dataproc_v1beta2.types.Job`
+    :type job: Dict
+    :param request_id: Optional. A unique id used to identify the request. If the server receives two
+        ``SubmitJobRequest`` requests with the same id, then the second request will be ignored and the first
+        ``Job`` created and stored in the backend is returned.
+        It is recommended to always set this value to a UUID.
+    :type request_id: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id:
+    :type gcp_conn_id: str
+    """
+
+    template_fields = ('project_id', 'location', 'job')
+
+    @apply_defaults
+    def __init__(
+        self,
+        project_id: str,
+        location: str,
+        job: Dict,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        *args,
+        **kwargs
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.location = location
+        self.job = job
+        self.request_id = request_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+
+    def execute(self, context: Dict):
+        self.log.info("Submitting job")
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
+        job_object = hook.submit_job(
+            project_id=self.project_id,
+            location=self.location,
+            job=self.job,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        job_id = job_object.reference.job_id
+        self.log.info("Waiting for job %s to complete", job_id)
+        hook.wait_for_job(
+            job_id=job_id,
+            project_id=self.project_id,
+            location=self.location
+        )
+        self.log.info("Job completed successfully.")
+
+
+class DataprocUpdateClusterOperator(BaseOperator):
+    """
+    Updates a cluster in a project.
+
+    :param project_id: Required. The ID of the Google Cloud Platform project the cluster belongs to.
+    :type project_id: str
+    :param location: Required. The Cloud Dataproc region in which to handle the request.
+    :type location: str
+    :param cluster_name: Required. The cluster name.
+    :type cluster_name: str
+    :param cluster: Required. The changes to the cluster.
+
+        If a dict is provided, it must be of the same form as the protobuf message
+        :class:`~google.cloud.dataproc_v1beta2.types.Cluster`
+    :type cluster: Union[Dict, google.cloud.dataproc_v1beta2.types.Cluster]
+    :param update_mask: Required. Specifies the path, relative to ``Cluster``, of the field to update. For
+        example, to change the number of workers in a cluster to 5, the ``update_mask`` parameter would be
+        specified as ``config.worker_config.num_instances``, and the ``PATCH`` request body would specify the
+        new value. If a dict is provided, it must be of the same form as the protobuf message
+        :class:`~google.cloud.dataproc_v1beta2.types.FieldMask`
+    :type update_mask: Union[Dict, google.cloud.dataproc_v1beta2.types.FieldMask]
+    :param graceful_decommission_timeout: Optional. Timeout for graceful YARN decomissioning. Graceful
+        decommissioning allows removing nodes from the cluster without interrupting jobs in progress. Timeout
+        specifies how long to wait for jobs in progress to finish before forcefully removing nodes (and
+        potentially interrupting jobs). Default timeout is 0 (for forceful decommission), and the maximum
+        allowed timeout is 1 day.
+    :type graceful_decommission_timeout: Union[Dict, google.cloud.dataproc_v1beta2.types.Duration]
+    :param request_id: Optional. A unique id used to identify the request. If the server receives two
+        ``UpdateClusterRequest`` requests with the same id, then the second request will be ignored and the
+        first ``google.longrunning.Operation`` created and stored in the backend is returned.
+    :type request_id: str
+    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+        retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :type timeout: float
+    :param metadata: Additional metadata that is provided to the method.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+    :type gcp_conn_id: str
+    """
+
+    @apply_defaults
+    def __init__(  # pylint: disable=too-many-arguments
+        self,
+        location: str,
+        cluster_name: str,
+        cluster: Union[Dict, Cluster],
+        update_mask: Union[Dict, FieldMask],
+        graceful_decommission_timeout: Union[Dict, Duration],
+        request_id: Optional[str] = None,
+        project_id: Optional[str] = None,
+        retry: Retry = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        *args,
+        **kwargs
+    ):
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.location = location
+        self.cluster_name = cluster_name
+        self.cluster = cluster
+        self.update_mask = update_mask
+        self.graceful_decommission_timeout = graceful_decommission_timeout
+        self.request_id = request_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+
+    def execute(self, context: Dict):
+        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
+        self.log.info("Updating %s cluster.", self.cluster_name)
+        operation = hook.update_cluster(
+            project_id=self.project_id,
+            location=self.location,
+            cluster_name=self.cluster_name,
+            cluster=self.cluster,
+            update_mask=self.update_mask,
+            graceful_decommission_timeout=self.graceful_decommission_timeout,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        operation.result()
+        self.log.info("Updated %s cluster.", self.cluster_name)