You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/06/19 17:31:44 UTC
[airflow] branch v1-10-test updated: Add 'main' param to
template_fields in DataprocSubmitPySparkJobOperator (#9154)
This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push:
new 4950aaa Add 'main' param to template_fields in DataprocSubmitPySparkJobOperator (#9154)
4950aaa is described below
commit 4950aaa8e9e16843c012a66de418717da4a2c351
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Jun 5 15:39:42 2020 +0100
Add 'main' param to template_fields in DataprocSubmitPySparkJobOperator (#9154)
(cherry picked from commit 9bcdadaf7e6e73d3d2246fbbd32a9f30a1b43ca9)
---
.../providers/google/cloud/operators/dataproc.py | 1708 ++++++++++++++++++++
1 file changed, 1708 insertions(+)
diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py
new file mode 100644
index 0000000..8c4a002
--- /dev/null
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -0,0 +1,1708 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""
+This module contains Google Dataproc operators.
+"""
+# pylint: disable=C0302
+
+import inspect
+import ntpath
+import os
+import re
+import time
+import uuid
+import warnings
+from datetime import datetime, timedelta
+from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
+
+from google.api_core.exceptions import AlreadyExists
+from google.api_core.retry import Retry
+from google.cloud.dataproc_v1beta2.types import ( # pylint: disable=no-name-in-module
+ Cluster, Duration, FieldMask,
+)
+from google.protobuf.json_format import MessageToDict
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.hooks.dataproc import DataprocHook, DataProcJobBuilder
+from airflow.providers.google.cloud.hooks.gcs import GCSHook
+from airflow.utils import timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.version import version as airflow_version
+
+
+# pylint: disable=too-many-instance-attributes
+class ClusterGenerator:
+ """
+ Create a new Dataproc Cluster.
+
+ :param cluster_name: The name of the DataProc cluster to create. (templated)
+ :type cluster_name: str
+ :param project_id: The ID of the google cloud project in which
+ to create the cluster. (templated)
+ :type project_id: str
+ :param num_workers: The # of workers to spin up. If set to zero will
+ spin up cluster in a single node mode
+ :type num_workers: int
+ :param storage_bucket: The storage bucket to use, setting to None lets dataproc
+ generate a custom one for you
+ :type storage_bucket: str
+ :param init_actions_uris: List of GCS uri's containing
+ dataproc initialization scripts
+ :type init_actions_uris: list[str]
+ :param init_action_timeout: Amount of time executable scripts in
+ init_actions_uris has to complete
+ :type init_action_timeout: str
+ :param metadata: dict of key-value google compute engine metadata entries
+ to add to all instances
+ :type metadata: dict
+ :param image_version: the version of software inside the Dataproc cluster
+ :type image_version: str
+ :param custom_image: custom Dataproc image for more info see
+ https://cloud.google.com/dataproc/docs/guides/dataproc-images
+ :type custom_image: str
+ :param custom_image_project_id: project id for the custom Dataproc image, for more info see
+ https://cloud.google.com/dataproc/docs/guides/dataproc-images
+ :type custom_image_project_id: str
+ :param autoscaling_policy: The autoscaling policy used by the cluster. Only resource names
+ including projectid and location (region) are valid. Example:
+ ``projects/[projectId]/locations/[dataproc_region]/autoscalingPolicies/[policy_id]``
+ :type autoscaling_policy: str
+ :param properties: dict of properties to set on
+ config files (e.g. spark-defaults.conf), see
+ https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#SoftwareConfig
+ :type properties: dict
+ :param optional_components: List of optional cluster components, for more info see
+ https://cloud.google.com/dataproc/docs/reference/rest/v1/ClusterConfig#Component
+ :type optional_components: list[str]
+ :param num_masters: The # of master nodes to spin up
+ :type num_masters: int
+ :param master_machine_type: Compute engine machine type to use for the master node
+ :type master_machine_type: str
+ :param master_disk_type: Type of the boot disk for the master node
+ (default is ``pd-standard``).
+ Valid values: ``pd-ssd`` (Persistent Disk Solid State Drive) or
+ ``pd-standard`` (Persistent Disk Hard Disk Drive).
+ :type master_disk_type: str
+ :param master_disk_size: Disk size for the master node
+ :type master_disk_size: int
+ :param worker_machine_type: Compute engine machine type to use for the worker nodes
+ :type worker_machine_type: str
+ :param worker_disk_type: Type of the boot disk for the worker node
+ (default is ``pd-standard``).
+ Valid values: ``pd-ssd`` (Persistent Disk Solid State Drive) or
+ ``pd-standard`` (Persistent Disk Hard Disk Drive).
+ :type worker_disk_type: str
+ :param worker_disk_size: Disk size for the worker nodes
+ :type worker_disk_size: int
+ :param num_preemptible_workers: The # of preemptible worker nodes to spin up
+ :type num_preemptible_workers: int
+ :param labels: dict of labels to add to the cluster
+ :type labels: dict
+ :param zone: The zone where the cluster will be located. Set to None to auto-zone. (templated)
+ :type zone: str
+ :param network_uri: The network uri to be used for machine communication, cannot be
+ specified with subnetwork_uri
+ :type network_uri: str
+ :param subnetwork_uri: The subnetwork uri to be used for machine communication,
+ cannot be specified with network_uri
+ :type subnetwork_uri: str
+ :param internal_ip_only: If true, all instances in the cluster will only
+ have internal IP addresses. This can only be enabled for subnetwork
+ enabled networks
+ :type internal_ip_only: bool
+ :param tags: The GCE tags to add to all instances
+ :type tags: list[str]
+ :param region: leave as 'global', might become relevant in the future. (templated)
+ :type region: str
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: str
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: str
+ :param service_account: The service account of the dataproc instances.
+ :type service_account: str
+ :param service_account_scopes: The URIs of service account scopes to be included.
+ :type service_account_scopes: list[str]
+ :param idle_delete_ttl: The longest duration that cluster would keep alive while
+ staying idle. Passing this threshold will cause cluster to be auto-deleted.
+ A duration in seconds.
+ :type idle_delete_ttl: int
+ :param auto_delete_time: The time when cluster will be auto-deleted.
+ :type auto_delete_time: datetime.datetime
+ :param auto_delete_ttl: The life duration of cluster, the cluster will be
+ auto-deleted at the end of this duration.
+ A duration in seconds. (If auto_delete_time is set this parameter will be ignored)
+ :type auto_delete_ttl: int
+ :param customer_managed_key: The customer-managed key used for disk encryption
+ ``projects/[PROJECT_STORING_KEYS]/locations/[LOCATION]/keyRings/[KEY_RING_NAME]/cryptoKeys/[KEY_NAME]`` # noqa # pylint: disable=line-too-long
+ :type customer_managed_key: str
+ """
+ # pylint: disable=too-many-arguments,too-many-locals
+ def __init__(self,
+ project_id: Optional[str] = None,
+ cluster_name: Optional[str] = None,
+ num_workers: Optional[int] = None,
+ zone: Optional[str] = None,
+ network_uri: Optional[str] = None,
+ subnetwork_uri: Optional[str] = None,
+ internal_ip_only: Optional[bool] = None,
+ tags: Optional[List[str]] = None,
+ storage_bucket: Optional[str] = None,
+ init_actions_uris: Optional[List[str]] = None,
+ init_action_timeout: str = "10m",
+ metadata: Optional[Dict] = None,
+ custom_image: Optional[str] = None,
+ custom_image_project_id: Optional[str] = None,
+ image_version: Optional[str] = None,
+ autoscaling_policy: Optional[str] = None,
+ properties: Optional[Dict] = None,
+ optional_components: Optional[List[str]] = None,
+ num_masters: int = 1,
+ master_machine_type: str = 'n1-standard-4',
+ master_disk_type: str = 'pd-standard',
+ master_disk_size: int = 1024,
+ worker_machine_type: str = 'n1-standard-4',
+ worker_disk_type: str = 'pd-standard',
+ worker_disk_size: int = 1024,
+ num_preemptible_workers: int = 0,
+ labels: Optional[Dict] = None,
+ region: Optional[str] = None,
+ service_account: Optional[str] = None,
+ service_account_scopes: Optional[List[str]] = None,
+ idle_delete_ttl: Optional[int] = None,
+ auto_delete_time: Optional[datetime] = None,
+ auto_delete_ttl: Optional[int] = None,
+ customer_managed_key: Optional[str] = None,
+ *args, # just in case
+ **kwargs
+ ) -> None:
+
+ self.cluster_name = cluster_name
+ self.project_id = project_id
+ self.region = region
+ self.num_masters = num_masters
+ self.num_workers = num_workers
+ self.num_preemptible_workers = num_preemptible_workers
+ self.storage_bucket = storage_bucket
+ self.init_actions_uris = init_actions_uris
+ self.init_action_timeout = init_action_timeout
+ self.metadata = metadata
+ self.custom_image = custom_image
+ self.custom_image_project_id = custom_image_project_id
+ self.image_version = image_version
+ self.properties = properties or dict()
+ self.optional_components = optional_components
+ self.master_machine_type = master_machine_type
+ self.master_disk_type = master_disk_type
+ self.master_disk_size = master_disk_size
+ self.autoscaling_policy = autoscaling_policy
+ self.worker_machine_type = worker_machine_type
+ self.worker_disk_type = worker_disk_type
+ self.worker_disk_size = worker_disk_size
+ self.labels = labels
+ self.zone = zone
+ self.network_uri = network_uri
+ self.subnetwork_uri = subnetwork_uri
+ self.internal_ip_only = internal_ip_only
+ self.tags = tags
+ self.service_account = service_account
+ self.service_account_scopes = service_account_scopes
+ self.idle_delete_ttl = idle_delete_ttl
+ self.auto_delete_time = auto_delete_time
+ self.auto_delete_ttl = auto_delete_ttl
+ self.customer_managed_key = customer_managed_key
+ self.single_node = num_workers == 0
+
+ if self.custom_image and self.image_version:
+ raise ValueError("The custom_image and image_version can't be both set")
+
+ if self.single_node and self.num_preemptible_workers > 0:
+ raise ValueError("Single node cannot have preemptible workers.")
+
+ def _get_init_action_timeout(self):
+ match = re.match(r"^(\d+)([sm])$", self.init_action_timeout)
+ if match:
+ if match.group(2) == "s":
+ return self.init_action_timeout
+ elif match.group(2) == "m":
+ val = float(match.group(1))
+ return "{}s".format(int(timedelta(minutes=val).total_seconds()))
+
+ raise AirflowException(
+ "DataprocClusterCreateOperator init_action_timeout"
+ " should be expressed in minutes or seconds. i.e. 10m, 30s")
+
+ def _build_gce_cluster_config(self, cluster_data):
+ if self.zone:
+ zone_uri = \
+ 'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
+ self.project_id, self.zone
+ )
+ cluster_data['config']['gce_cluster_config']['zone_uri'] = zone_uri
+
+ if self.metadata:
+ cluster_data['config']['gce_cluster_config']['metadata'] = self.metadata
+
+ if self.network_uri:
+ cluster_data['config']['gce_cluster_config']['network_uri'] = self.network_uri
+
+ if self.subnetwork_uri:
+ cluster_data['config']['gce_cluster_config']['subnetwork_uri'] = \
+ self.subnetwork_uri
+
+ if self.internal_ip_only:
+ if not self.subnetwork_uri:
+ raise AirflowException("Set internal_ip_only to true only when"
+ " you pass a subnetwork_uri.")
+ cluster_data['config']['gce_cluster_config']['internal_ip_only'] = True
+
+ if self.tags:
+ cluster_data['config']['gce_cluster_config']['tags'] = self.tags
+
+ if self.service_account:
+ cluster_data['config']['gce_cluster_config']['service_account'] = \
+ self.service_account
+
+ if self.service_account_scopes:
+ cluster_data['config']['gce_cluster_config']['service_account_scopes'] = \
+ self.service_account_scopes
+
+ return cluster_data
+
+ def _build_lifecycle_config(self, cluster_data):
+ if self.idle_delete_ttl:
+ cluster_data['config']['lifecycle_config']['idle_delete_ttl'] = \
+ "{}s".format(self.idle_delete_ttl)
+
+ if self.auto_delete_time:
+ utc_auto_delete_time = timezone.convert_to_utc(self.auto_delete_time)
+ cluster_data['config']['lifecycle_config']['auto_delete_time'] = \
+ utc_auto_delete_time.format('%Y-%m-%dT%H:%M:%S.%fZ', formatter='classic')
+ elif self.auto_delete_ttl:
+ cluster_data['config']['lifecycle_config']['auto_delete_ttl'] = \
+ "{}s".format(self.auto_delete_ttl)
+
+ return cluster_data
+
+ def _build_cluster_data(self):
+ if self.zone:
+ master_type_uri = \
+ "https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}".format(
+ self.project_id, self.zone, self.master_machine_type)
+ worker_type_uri = \
+ "https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}".format(
+ self.project_id, self.zone, self.worker_machine_type)
+ else:
+ master_type_uri = self.master_machine_type
+ worker_type_uri = self.worker_machine_type
+
+ cluster_data = {
+ 'project_id': self.project_id,
+ 'cluster_name': self.cluster_name,
+ 'config': {
+ 'gce_cluster_config': {
+ },
+ 'master_config': {
+ 'num_instances': self.num_masters,
+ 'machine_type_uri': master_type_uri,
+ 'disk_config': {
+ 'boot_disk_type': self.master_disk_type,
+ 'boot_disk_size_gb': self.master_disk_size
+ }
+ },
+ 'worker_config': {
+ 'num_instances': self.num_workers,
+ 'machine_type_uri': worker_type_uri,
+ 'disk_config': {
+ 'boot_disk_type': self.worker_disk_type,
+ 'boot_disk_size_gb': self.worker_disk_size
+ }
+ },
+ 'secondary_worker_config': {},
+ 'software_config': {},
+ 'lifecycle_config': {},
+ 'encryption_config': {},
+ 'autoscaling_config': {},
+ }
+ }
+ if self.num_preemptible_workers > 0:
+ cluster_data['config']['secondary_worker_config'] = {
+ 'num_instances': self.num_preemptible_workers,
+ 'machine_type_uri': worker_type_uri,
+ 'disk_config': {
+ 'boot_disk_type': self.worker_disk_type,
+ 'boot_disk_size_gb': self.worker_disk_size
+ },
+ 'is_preemptible': True
+ }
+
+ cluster_data['labels'] = self.labels or {}
+
+ # Dataproc labels must conform to the following regex:
+ # [a-z]([-a-z0-9]*[a-z0-9])? (current airflow version string follows
+ # semantic versioning spec: x.y.z).
+ cluster_data['labels'].update({
+ 'airflow-version': 'v' + airflow_version.replace('.', '-').replace('+', '-')
+ })
+ if self.storage_bucket:
+ cluster_data['config']['config_bucket'] = self.storage_bucket
+
+ if self.image_version:
+ cluster_data['config']['software_config']['image_version'] = self.image_version
+
+ elif self.custom_image:
+ project_id = self.custom_image_project_id or self.project_id
+ custom_image_url = 'https://www.googleapis.com/compute/beta/projects/' \
+ '{}/global/images/{}'.format(project_id,
+ self.custom_image)
+ cluster_data['config']['master_config']['image_uri'] = custom_image_url
+ if not self.single_node:
+ cluster_data['config']['worker_config']['image_uri'] = custom_image_url
+
+ cluster_data = self._build_gce_cluster_config(cluster_data)
+
+ if self.single_node:
+ self.properties["dataproc:dataproc.allow.zero.workers"] = "true"
+
+ if self.properties:
+ cluster_data['config']['software_config']['properties'] = self.properties
+
+ if self.optional_components:
+ cluster_data['config']['software_config']['optional_components'] = self.optional_components
+
+ cluster_data = self._build_lifecycle_config(cluster_data)
+
+ if self.init_actions_uris:
+ init_actions_dict = [
+ {
+ 'executable_file': uri,
+ 'execution_timeout': self._get_init_action_timeout()
+ } for uri in self.init_actions_uris
+ ]
+ cluster_data['config']['initialization_actions'] = init_actions_dict
+
+ if self.customer_managed_key:
+ cluster_data['config']['encryption_config'] = \
+ {'gce_pd_kms_key_name': self.customer_managed_key}
+ if self.autoscaling_policy:
+ cluster_data['config']['autoscaling_config'] = {'policy_uri': self.autoscaling_policy}
+
+ return cluster_data
+
+ def make(self):
+ """
+ Helper method for easier migration.
+ :return: Dict representing Dataproc cluster.
+ """
+ return self._build_cluster_data()
+
+
+# pylint: disable=too-many-instance-attributes
+class DataprocCreateClusterOperator(BaseOperator):
+ """
+ Create a new cluster on Google Cloud Dataproc. The operator will wait until the
+ creation is successful or an error occurs in the creation process.
+
+ The parameters allow to configure the cluster. Please refer to
+
+ https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters
+
+ for a detailed explanation on the different parameters. Most of the configuration
+ parameters detailed in the link are available as a parameter to this operator.
+
+ :param project_id: The ID of the google cloud project in which
+ to create the cluster. (templated)
+ :type project_id: str
+ :param region: leave as 'global', might become relevant in the future. (templated)
+ :type region: str
+ :param request_id: Optional. A unique id used to identify the request. If the server receives two
+ ``DeleteClusterRequest`` requests with the same id, then the second request will be ignored and the
+ first ``google.longrunning.Operation`` created and stored in the backend is returned.
+ :type request_id: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: str
+ """
+
+ template_fields = ('project_id', 'region', 'cluster')
+
+ @apply_defaults
+ def __init__(self,
+ region: str = 'global',
+ project_id: Optional[str] = None,
+ cluster: Optional[Dict] = None,
+ request_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ *args,
+ **kwargs) -> None:
+ # TODO: remove one day
+ if cluster is None:
+ warnings.warn(
+ "Passing cluster parameters by keywords to `{}` "
+ "will be deprecated. Please provide cluster object using `cluster` parameter. "
+ "You can use `airflow.dataproc.ClusterGenerator.generate_cluster` method to "
+ "obtain cluster object.".format(type(self).__name__),
+ DeprecationWarning, stacklevel=1
+ )
+ # Remove result of apply defaults
+ if 'params' in kwargs:
+ del kwargs['params']
+
+ # Create cluster object from kwargs
+ kwargs['region'] = region
+ kwargs['project_id'] = project_id
+ cluster = ClusterGenerator(**kwargs).make()
+
+ # Remove from kwargs cluster params passed for backward compatibility
+ cluster_params = inspect.signature(ClusterGenerator.__init__).parameters
+ for arg in cluster_params:
+ if arg in kwargs:
+ del kwargs[arg]
+
+ super().__init__(*args, **kwargs)
+
+ self.cluster = cluster
+ self.cluster_name = cluster.get('cluster_name')
+ self.project_id = project_id
+ self.region = region
+ self.request_id = request_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+
+ def execute(self, context):
+ self.log.info('Creating cluster: %s', self.cluster_name)
+ hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
+ try:
+ operation = hook.create_cluster(
+ project_id=self.project_id,
+ region=self.region,
+ cluster=self.cluster,
+ request_id=self.request_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ cluster = operation.result()
+ self.log.info("Cluster created.")
+ except AlreadyExists:
+ cluster = hook.get_cluster(
+ project_id=self.project_id,
+ region=self.region,
+ cluster_name=self.cluster_name,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ self.log.info("Cluster already exists.")
+ return MessageToDict(cluster)
+
+
+class DataprocScaleClusterOperator(BaseOperator):
+ """
+ Scale, up or down, a cluster on Google Cloud Dataproc.
+ The operator will wait until the cluster is re-scaled.
+
+ **Example**: ::
+
+ t1 = DataprocClusterScaleOperator(
+ task_id='dataproc_scale',
+ project_id='my-project',
+ cluster_name='cluster-1',
+ num_workers=10,
+ num_preemptible_workers=10,
+ graceful_decommission_timeout='1h',
+ dag=dag)
+
+ .. seealso::
+ For more detail on about scaling clusters have a look at the reference:
+ https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/scaling-clusters
+
+ :param cluster_name: The name of the cluster to scale. (templated)
+ :type cluster_name: str
+ :param project_id: The ID of the google cloud project in which
+ the cluster runs. (templated)
+ :type project_id: str
+ :param region: The region for the dataproc cluster. (templated)
+ :type region: str
+ :param num_workers: The new number of workers
+ :type num_workers: int
+ :param num_preemptible_workers: The new number of preemptible workers
+ :type num_preemptible_workers: int
+ :param graceful_decommission_timeout: Timeout for graceful YARN decomissioning.
+ Maximum value is 1d
+ :type graceful_decommission_timeout: str
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: str
+ """
+
+ template_fields = ['cluster_name', 'project_id', 'region']
+
+ @apply_defaults
+ def __init__(self,
+ cluster_name: str,
+ project_id: Optional[str] = None,
+ region: str = 'global',
+ num_workers: int = 2,
+ num_preemptible_workers: int = 0,
+ graceful_decommission_timeout: Optional[str] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ *args,
+ **kwargs) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.cluster_name = cluster_name
+ self.num_workers = num_workers
+ self.num_preemptible_workers = num_preemptible_workers
+ self.graceful_decommission_timeout = graceful_decommission_timeout
+ self.gcp_conn_id = gcp_conn_id
+
+ # TODO: Remove one day
+ warnings.warn(
+ "The `{cls}` operator is deprecated, please use `DataprocUpdateClusterOperator` instead.".format(
+ cls=type(self).__name__
+ ),
+ DeprecationWarning,
+ stacklevel=1
+ )
+
+ def _build_scale_cluster_data(self):
+ scale_data = {
+ 'config': {
+ 'worker_config': {
+ 'num_instances': self.num_workers
+ },
+ 'secondary_worker_config': {
+ 'num_instances': self.num_preemptible_workers
+ }
+ }
+ }
+ return scale_data
+
+ @property
+ def _graceful_decommission_timeout_object(self) -> Optional[Dict]:
+ if not self.graceful_decommission_timeout:
+ return None
+
+ timeout = None
+ match = re.match(r"^(\d+)([smdh])$", self.graceful_decommission_timeout)
+ if match:
+ if match.group(2) == "s":
+ timeout = int(match.group(1))
+ elif match.group(2) == "m":
+ val = float(match.group(1))
+ timeout = int(timedelta(minutes=val).total_seconds())
+ elif match.group(2) == "h":
+ val = float(match.group(1))
+ timeout = int(timedelta(hours=val).total_seconds())
+ elif match.group(2) == "d":
+ val = float(match.group(1))
+ timeout = int(timedelta(days=val).total_seconds())
+
+ if not timeout:
+ raise AirflowException(
+ "DataprocClusterScaleOperator "
+ " should be expressed in day, hours, minutes or seconds. "
+ " i.e. 1d, 4h, 10m, 30s"
+ )
+
+ return {'seconds': timeout}
+
+ def execute(self, context):
+ """
+ Scale, up or down, a cluster on Google Cloud Dataproc.
+ """
+ self.log.info("Scaling cluster: %s", self.cluster_name)
+
+ scaling_cluster_data = self._build_scale_cluster_data()
+ update_mask = [
+ "config.worker_config.num_instances",
+ "config.secondary_worker_config.num_instances"
+ ]
+
+ hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
+ operation = hook.update_cluster(
+ project_id=self.project_id,
+ location=self.region,
+ cluster_name=self.cluster_name,
+ cluster=scaling_cluster_data,
+ graceful_decommission_timeout=self._graceful_decommission_timeout_object,
+ update_mask={'paths': update_mask},
+ )
+ operation.result()
+ self.log.info("Cluster scaling finished")
+
+
+class DataprocDeleteClusterOperator(BaseOperator):
+ """
+ Deletes a cluster in a project.
+
+ :param project_id: Required. The ID of the Google Cloud Platform project that the cluster belongs to.
+ :type project_id: str
+ :param region: Required. The Cloud Dataproc region in which to handle the request.
+ :type region: str
+ :param cluster_name: Required. The cluster name.
+ :type cluster_name: str
+ :param cluster_uuid: Optional. Specifying the ``cluster_uuid`` means the RPC should fail
+ if cluster with specified UUID does not exist.
+ :type cluster_uuid: str
+ :param request_id: Optional. A unique id used to identify the request. If the server receives two
+ ``DeleteClusterRequest`` requests with the same id, then the second request will be ignored and the
+ first ``google.longrunning.Operation`` created and stored in the backend is returned.
+ :type request_id: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: str
+ """
+
+ @apply_defaults
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ cluster_name: str,
+ cluster_uuid: Optional[str] = None,
+ request_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ *args,
+ **kwargs
+ ):
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.cluster_name = cluster_name
+ self.cluster_uuid = cluster_uuid
+ self.request_id = request_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+
+ def execute(self, context: Dict):
+ hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
+ self.log.info("Deleting cluster: %s", self.cluster_name)
+ operation = hook.delete_cluster(
+ project_id=self.project_id,
+ region=self.region,
+ cluster_name=self.cluster_name,
+ cluster_uuid=self.cluster_uuid,
+ request_id=self.request_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ operation.result()
+ self.log.info("Cluster deleted.")
+
+
+class DataprocJobBaseOperator(BaseOperator):
+ """
+ The base class for operators that launch job on DataProc.
+
+ :param job_name: The job name used in the DataProc cluster. This name by default
+ is the task_id appended with the execution data, but can be templated. The
+ name will always be appended with a random number to avoid name clashes.
+ :type job_name: str
+ :param cluster_name: The name of the DataProc cluster.
+ :type cluster_name: str
+ :param dataproc_properties: Map for the Hive properties. Ideal to put in
+ default arguments (templated)
+ :type dataproc_properties: dict
+ :param dataproc_jars: HCFS URIs of jar files to add to the CLASSPATH of the Hive server and Hadoop
+ MapReduce (MR) tasks. Can contain Hive SerDes and UDFs. (templated)
+ :type dataproc_jars: list
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: str
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: str
+ :param labels: The labels to associate with this job. Label keys must contain 1 to 63 characters,
+ and must conform to RFC 1035. Label values may be empty, but, if present, must contain 1 to 63
+ characters, and must conform to RFC 1035. No more than 32 labels can be associated with a job.
+ :type labels: dict
+ :param region: The specified region where the dataproc cluster is created.
+ :type region: str
+ :param job_error_states: Job states that should be considered error states.
+ Any states in this set will result in an error being raised and failure of the
+ task. Eg, if the ``CANCELLED`` state should also be considered a task failure,
+ pass in ``{'ERROR', 'CANCELLED'}``. Possible values are currently only
+ ``'ERROR'`` and ``'CANCELLED'``, but could change in the future. Defaults to
+ ``{'ERROR'}``.
+ :type job_error_states: set
+ :var dataproc_job_id: The actual "jobId" as submitted to the Dataproc API.
+ This is useful for identifying or linking to the job in the Google Cloud Console
+ Dataproc UI, as the actual "jobId" submitted to the Dataproc API is appended with
+ an 8 character random string.
+ :vartype dataproc_job_id: str
+ """
+ job_type = ""
+
+ @apply_defaults
+ def __init__(self,
+ job_name: str = '{{task.task_id}}_{{ds_nodash}}',
+ cluster_name: str = "cluster-1",
+ dataproc_properties: Optional[Dict] = None,
+ dataproc_jars: Optional[List[str]] = None,
+ gcp_conn_id: str = 'google_cloud_default',
+ delegate_to: Optional[str] = None,
+ labels: Optional[Dict] = None,
+ region: str = 'global',
+ job_error_states: Optional[Set[str]] = None,
+ *args,
+ **kwargs) -> None:
+ super().__init__(*args, **kwargs)
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.labels = labels
+ self.job_name = job_name
+ self.cluster_name = cluster_name
+ self.dataproc_properties = dataproc_properties
+ self.dataproc_jars = dataproc_jars
+ self.region = region
+ self.job_error_states = job_error_states if job_error_states is not None else {'ERROR'}
+
+ self.hook = DataprocHook(gcp_conn_id=gcp_conn_id)
+ self.project_id = self.hook.project_id
+ self.job_template = None
+ self.job = None
+ self.dataproc_job_id = None
+
+ def create_job_template(self):
+ """
+ Initialize `self.job_template` with default values
+ """
+ self.job_template = DataProcJobBuilder(
+ project_id=self.project_id,
+ task_id=self.task_id,
+ cluster_name=self.cluster_name,
+ job_type=self.job_type,
+ properties=self.dataproc_properties
+ )
+ self.job_template.set_job_name(self.job_name)
+ self.job_template.add_jar_file_uris(self.dataproc_jars)
+ self.job_template.add_labels(self.labels)
+
+ def _generate_job_template(self):
+ if self.job_template:
+ job = self.job_template.build()
+ return job['job']
+ raise Exception("Create a job template before")
+
+ def execute(self, context):
+ if self.job_template:
+ self.job = self.job_template.build()
+ self.dataproc_job_id = self.job["job"]["reference"]["job_id"]
+ self.log.info('Submitting %s job %s', self.job_type, self.dataproc_job_id)
+ job_object = self.hook.submit_job(
+ project_id=self.project_id,
+ job=self.job["job"],
+ location=self.region,
+ )
+ job_id = job_object.reference.job_id
+ self.hook.wait_for_job(
+ job_id=job_id,
+ location=self.region,
+ project_id=self.project_id
+ )
+ self.log.info('Job executed correctly.')
+ else:
+ raise AirflowException("Create a job template before")
+
+ def on_kill(self):
+ """
+ Callback called when the operator is killed.
+ Cancel any running job.
+ """
+ if self.dataproc_job_id:
+ self.hook.cancel_job(
+ project_id=self.project_id,
+ job_id=self.dataproc_job_id,
+ location=self.region
+ )
+
+
+class DataprocSubmitPigJobOperator(DataprocJobBaseOperator):
+ """
+ Start a Pig query Job on a Cloud DataProc cluster. The parameters of the operation
+ will be passed to the cluster.
+
+ It's a good practice to define dataproc_* parameters in the default_args of the dag
+ like the cluster name and UDFs.
+
+ .. code-block:: python
+
+ default_args = {
+ 'cluster_name': 'cluster-1',
+ 'dataproc_pig_jars': [
+ 'gs://example/udf/jar/datafu/1.2.0/datafu.jar',
+ 'gs://example/udf/jar/gpig/1.2/gpig.jar'
+ ]
+ }
+
+ You can pass a pig script as string or file reference. Use variables to pass on
+ variables for the pig script to be resolved on the cluster or use the parameters to
+ be resolved in the script as template parameters.
+
+ **Example**: ::
+
+ t1 = DataProcPigOperator(
+ task_id='dataproc_pig',
+ query='a_pig_script.pig',
+ variables={'out': 'gs://example/output/{{ds}}'},
+ dag=dag)
+
+ .. seealso::
+ For more detail on about job submission have a look at the reference:
+ https://cloud.google.com/dataproc/reference/rest/v1/projects.regions.jobs
+
+ :param query: The query or reference to the query
+ file (pg or pig extension). (templated)
+ :type query: str
+ :param query_uri: The HCFS URI of the script that contains the Pig queries.
+ :type query_uri: str
+ :param variables: Map of named parameters for the query. (templated)
+ :type variables: dict
+ """
+ template_fields = ['query', 'variables', 'job_name', 'cluster_name',
+ 'region', 'dataproc_jars', 'dataproc_properties']
+ template_ext = ('.pg', '.pig',)
+ ui_color = '#0273d4'
+ job_type = 'pig_job'
+
+ @apply_defaults
+ def __init__(
+ self,
+ query: Optional[str] = None,
+ query_uri: Optional[str] = None,
+ variables: Optional[Dict] = None,
+ *args,
+ **kwargs
+ ) -> None:
+ # TODO: Remove one day
+ warnings.warn(
+ "The `{cls}` operator is deprecated, please use `DataprocSubmitJobOperator` instead. You can use"
+ " `generate_job` method of `{cls}` to generate dictionary representing your job"
+ " and use it with the new operator.".format(cls=type(self).__name__),
+ DeprecationWarning,
+ stacklevel=1
+ )
+
+ super().__init__(*args, **kwargs)
+ self.query = query
+ self.query_uri = query_uri
+ self.variables = variables
+
+ def generate_job(self):
+ """
+ Helper method for easier migration to `DataprocSubmitJobOperator`.
+ :return: Dict representing Dataproc job
+ """
+ self.create_job_template()
+
+ if self.query is None:
+ self.job_template.add_query_uri(self.query_uri)
+ else:
+ self.job_template.add_query(self.query)
+ self.job_template.add_variables(self.variables)
+ return self._generate_job_template()
+
+ def execute(self, context):
+ self.create_job_template()
+
+ if self.query is None:
+ self.job_template.add_query_uri(self.query_uri)
+ else:
+ self.job_template.add_query(self.query)
+ self.job_template.add_variables(self.variables)
+
+ super().execute(context)
+
+
+class DataprocSubmitHiveJobOperator(DataprocJobBaseOperator):
+ """
+ Start a Hive query Job on a Cloud DataProc cluster.
+
+ :param query: The query or reference to the query file (q extension).
+ :type query: str
+ :param query_uri: The HCFS URI of the script that contains the Hive queries.
+ :type query_uri: str
+ :param variables: Map of named parameters for the query.
+ :type variables: dict
+ """
+ template_fields = ['query', 'variables', 'job_name', 'cluster_name',
+ 'region', 'dataproc_jars', 'dataproc_properties']
+ template_ext = ('.q', '.hql',)
+ ui_color = '#0273d4'
+ job_type = 'hive_job'
+
+ @apply_defaults
+ def __init__(
+ self,
+ query: Optional[str] = None,
+ query_uri: Optional[str] = None,
+ variables: Optional[Dict] = None,
+ *args,
+ **kwargs
+ ) -> None:
+ # TODO: Remove one day
+ warnings.warn(
+ "The `{cls}` operator is deprecated, please use `DataprocSubmitJobOperator` instead. You can use"
+ " `generate_job` method of `{cls}` to generate dictionary representing your job"
+ " and use it with the new operator.".format(cls=type(self).__name__),
+ DeprecationWarning,
+ stacklevel=1
+ )
+
+ super().__init__(*args, **kwargs)
+ self.query = query
+ self.query_uri = query_uri
+ self.variables = variables
+ if self.query is not None and self.query_uri is not None:
+ raise AirflowException('Only one of `query` and `query_uri` can be passed.')
+
+ def generate_job(self):
+ """
+ Helper method for easier migration to `DataprocSubmitJobOperator`.
+ :return: Dict representing Dataproc job
+ """
+ self.create_job_template()
+ if self.query is None:
+ self.job_template.add_query_uri(self.query_uri)
+ else:
+ self.job_template.add_query(self.query)
+ self.job_template.add_variables(self.variables)
+ return self._generate_job_template()
+
+ def execute(self, context):
+ self.create_job_template()
+ if self.query is None:
+ self.job_template.add_query_uri(self.query_uri)
+ else:
+ self.job_template.add_query(self.query)
+ self.job_template.add_variables(self.variables)
+
+ super().execute(context)
+
+
+class DataprocSubmitSparkSqlJobOperator(DataprocJobBaseOperator):
+ """
+ Start a Spark SQL query Job on a Cloud DataProc cluster.
+
+ :param query: The query or reference to the query file (q extension). (templated)
+ :type query: str
+ :param query_uri: The HCFS URI of the script that contains the SQL queries.
+ :type query_uri: str
+ :param variables: Map of named parameters for the query. (templated)
+ :type variables: dict
+ """
+ template_fields = ['query', 'variables', 'job_name', 'cluster_name',
+ 'region', 'dataproc_jars', 'dataproc_properties']
+ template_ext = ('.q',)
+ ui_color = '#0273d4'
+ job_type = 'spark_sql_job'
+
+ @apply_defaults
+ def __init__(
+ self,
+ query: Optional[str] = None,
+ query_uri: Optional[str] = None,
+ variables: Optional[Dict] = None,
+ *args,
+ **kwargs
+ ) -> None:
+ # TODO: Remove one day
+ warnings.warn(
+ "The `{cls}` operator is deprecated, please use `DataprocSubmitJobOperator` instead. You can use"
+ " `generate_job` method of `{cls}` to generate dictionary representing your job"
+ " and use it with the new operator.".format(cls=type(self).__name__),
+ DeprecationWarning,
+ stacklevel=1
+ )
+
+ super().__init__(*args, **kwargs)
+ self.query = query
+ self.query_uri = query_uri
+ self.variables = variables
+ if self.query is not None and self.query_uri is not None:
+ raise AirflowException('Only one of `query` and `query_uri` can be passed.')
+
+ def generate_job(self):
+ """
+ Helper method for easier migration to `DataprocSubmitJobOperator`.
+ :return: Dict representing Dataproc job
+ """
+ self.create_job_template()
+ if self.query is None:
+ self.job_template.add_query_uri(self.query_uri)
+ else:
+ self.job_template.add_query(self.query)
+ self.job_template.add_variables(self.variables)
+ return self._generate_job_template()
+
+ def execute(self, context):
+ self.create_job_template()
+ if self.query is None:
+ self.job_template.add_query_uri(self.query_uri)
+ else:
+ self.job_template.add_query(self.query)
+ self.job_template.add_variables(self.variables)
+
+ super().execute(context)
+
+
+class DataprocSubmitSparkJobOperator(DataprocJobBaseOperator):
+ """
+ Start a Spark Job on a Cloud DataProc cluster.
+
+ :param main_jar: The HCFS URI of the jar file that contains the main class
+ (use this or the main_class, not both together).
+ :type main_jar: str
+ :param main_class: Name of the job class. (use this or the main_jar, not both
+ together).
+ :type main_class: str
+ :param arguments: Arguments for the job. (templated)
+ :type arguments: list
+ :param archives: List of archived files that will be unpacked in the work
+ directory. Should be stored in Cloud Storage.
+ :type archives: list
+ :param files: List of files to be copied to the working directory
+ :type files: list
+ """
+
+ template_fields = ['arguments', 'job_name', 'cluster_name',
+ 'region', 'dataproc_jars', 'dataproc_properties']
+ ui_color = '#0273d4'
+ job_type = 'spark_job'
+
+ @apply_defaults
+ def __init__(
+ self,
+ main_jar: Optional[str] = None,
+ main_class: Optional[str] = None,
+ arguments: Optional[List] = None,
+ archives: Optional[List] = None,
+ files: Optional[List] = None,
+ *args,
+ **kwargs
+ ) -> None:
+ # TODO: Remove one day
+ warnings.warn(
+ "The `{cls}` operator is deprecated, please use `DataprocSubmitJobOperator` instead. You can use"
+ " `generate_job` method of `{cls}` to generate dictionary representing your job"
+ " and use it with the new operator.".format(cls=type(self).__name__),
+ DeprecationWarning,
+ stacklevel=1
+ )
+
+ super().__init__(*args, **kwargs)
+ self.main_jar = main_jar
+ self.main_class = main_class
+ self.arguments = arguments
+ self.archives = archives
+ self.files = files
+
+ def generate_job(self):
+ """
+ Helper method for easier migration to `DataprocSubmitJobOperator`.
+ :return: Dict representing Dataproc job
+ """
+ self.create_job_template()
+ self.job_template.set_main(self.main_jar, self.main_class)
+ self.job_template.add_args(self.arguments)
+ self.job_template.add_archive_uris(self.archives)
+ self.job_template.add_file_uris(self.files)
+ return self._generate_job_template()
+
+ def execute(self, context):
+ self.create_job_template()
+ self.job_template.set_main(self.main_jar, self.main_class)
+ self.job_template.add_args(self.arguments)
+ self.job_template.add_archive_uris(self.archives)
+ self.job_template.add_file_uris(self.files)
+
+ super().execute(context)
+
+
+class DataprocSubmitHadoopJobOperator(DataprocJobBaseOperator):
+ """
+ Start a Hadoop Job on a Cloud DataProc cluster.
+
+ :param main_jar: The HCFS URI of the jar file containing the main class
+ (use this or the main_class, not both together).
+ :type main_jar: str
+ :param main_class: Name of the job class. (use this or the main_jar, not both
+ together).
+ :type main_class: str
+ :param arguments: Arguments for the job. (templated)
+ :type arguments: list
+ :param archives: List of archived files that will be unpacked in the work
+ directory. Should be stored in Cloud Storage.
+ :type archives: list
+ :param files: List of files to be copied to the working directory
+ :type files: list
+ """
+
+ template_fields = ['arguments', 'job_name', 'cluster_name',
+ 'region', 'dataproc_jars', 'dataproc_properties']
+ ui_color = '#0273d4'
+ job_type = 'hadoop_job'
+
+ @apply_defaults
+ def __init__(
+ self,
+ main_jar: Optional[str] = None,
+ main_class: Optional[str] = None,
+ arguments: Optional[List] = None,
+ archives: Optional[List] = None,
+ files: Optional[List] = None,
+ *args,
+ **kwargs
+ ) -> None:
+ # TODO: Remove one day
+ warnings.warn(
+ "The `{cls}` operator is deprecated, please use `DataprocSubmitJobOperator` instead. You can use"
+ " `generate_job` method of `{cls}` to generate dictionary representing your job"
+ " and use it with the new operator.".format(cls=type(self).__name__),
+ DeprecationWarning,
+ stacklevel=1
+ )
+
+ super().__init__(*args, **kwargs)
+ self.main_jar = main_jar
+ self.main_class = main_class
+ self.arguments = arguments
+ self.archives = archives
+ self.files = files
+
+ def generate_job(self):
+ """
+ Helper method for easier migration to `DataprocSubmitJobOperator`.
+ :return: Dict representing Dataproc job
+ """
+ self.create_job_template()
+ self.job_template.set_main(self.main_jar, self.main_class)
+ self.job_template.add_args(self.arguments)
+ self.job_template.add_archive_uris(self.archives)
+ self.job_template.add_file_uris(self.files)
+ return self._generate_job_template()
+
+ def execute(self, context):
+ self.create_job_template()
+ self.job_template.set_main(self.main_jar, self.main_class)
+ self.job_template.add_args(self.arguments)
+ self.job_template.add_archive_uris(self.archives)
+ self.job_template.add_file_uris(self.files)
+
+ super().execute(context)
+
+
+class DataprocSubmitPySparkJobOperator(DataprocJobBaseOperator):
+ """
+ Start a PySpark Job on a Cloud DataProc cluster.
+
+ :param main: [Required] The Hadoop Compatible Filesystem (HCFS) URI of the main
+ Python file to use as the driver. Must be a .py file. (templated)
+ :type main: str
+ :param arguments: Arguments for the job. (templated)
+ :type arguments: list
+ :param archives: List of archived files that will be unpacked in the work
+ directory. Should be stored in Cloud Storage.
+ :type archives: list
+ :param files: List of files to be copied to the working directory
+ :type files: list
+ :param pyfiles: List of Python files to pass to the PySpark framework.
+ Supported file types: .py, .egg, and .zip
+ :type pyfiles: list
+ """
+
+ template_fields = ['main', 'arguments', 'job_name', 'cluster_name',
+ 'region', 'dataproc_jars', 'dataproc_properties']
+ ui_color = '#0273d4'
+ job_type = 'pyspark_job'
+
+ @staticmethod
+ def _generate_temp_filename(filename):
+ date = time.strftime('%Y%m%d%H%M%S')
+ return "{}_{}_{}".format(date, str(uuid.uuid4())[:8], ntpath.basename(filename))
+
+ def _upload_file_temp(self, bucket, local_file):
+ """
+ Upload a local file to a Google Cloud Storage bucket.
+ """
+ temp_filename = self._generate_temp_filename(local_file)
+ if not bucket:
+ raise AirflowException(
+ "If you want Airflow to upload the local file to a temporary bucket, set "
+ "the 'temp_bucket' key in the connection string")
+
+ self.log.info("Uploading %s to %s", local_file, temp_filename)
+
+ GCSHook(
+ google_cloud_storage_conn_id=self.gcp_conn_id
+ ).upload(
+ bucket_name=bucket,
+ object_name=temp_filename,
+ mime_type='application/x-python',
+ filename=local_file
+ )
+ return "gs://{}/{}".format(bucket, temp_filename)
+
+ @apply_defaults
+ def __init__(
+ self,
+ main: str,
+ arguments: Optional[List] = None,
+ archives: Optional[List] = None,
+ pyfiles: Optional[List] = None,
+ files: Optional[List] = None,
+ *args,
+ **kwargs
+ ) -> None:
+ # TODO: Remove one day
+ warnings.warn(
+ "The `{cls}` operator is deprecated, please use `DataprocSubmitJobOperator` instead. You can use"
+ " `generate_job` method of `{cls}` to generate dictionary representing your job"
+ " and use it with the new operator.".format(cls=type(self).__name__),
+ DeprecationWarning,
+ stacklevel=1
+ )
+
+ super().__init__(*args, **kwargs)
+ self.main = main
+ self.arguments = arguments
+ self.archives = archives
+ self.files = files
+ self.pyfiles = pyfiles
+
+ def generate_job(self):
+ """
+ Helper method for easier migration to `DataprocSubmitJobOperator`.
+ :return: Dict representing Dataproc job
+ """
+ self.create_job_template()
+ # Check if the file is local, if that is the case, upload it to a bucket
+ if os.path.isfile(self.main):
+ cluster_info = self.hook.get_cluster(
+ project_id=self.hook.project_id,
+ region=self.region,
+ cluster_name=self.cluster_name
+ )
+ bucket = cluster_info['config']['config_bucket']
+ self.main = "gs://{}/{}".format(bucket, self.main)
+ self.job_template.set_python_main(self.main)
+ self.job_template.add_args(self.arguments)
+ self.job_template.add_archive_uris(self.archives)
+ self.job_template.add_file_uris(self.files)
+ self.job_template.add_python_file_uris(self.pyfiles)
+
+ return self._generate_job_template()
+
+ def execute(self, context):
+ self.create_job_template()
+ # Check if the file is local, if that is the case, upload it to a bucket
+ if os.path.isfile(self.main):
+ cluster_info = self.hook.get_cluster(
+ project_id=self.hook.project_id,
+ region=self.region,
+ cluster_name=self.cluster_name
+ )
+ bucket = cluster_info['config']['config_bucket']
+ self.main = self._upload_file_temp(bucket, self.main)
+
+ self.job_template.set_python_main(self.main)
+ self.job_template.add_args(self.arguments)
+ self.job_template.add_archive_uris(self.archives)
+ self.job_template.add_file_uris(self.files)
+ self.job_template.add_python_file_uris(self.pyfiles)
+
+ super().execute(context)
+
+
+class DataprocInstantiateWorkflowTemplateOperator(BaseOperator):
+ """
+ Instantiate a WorkflowTemplate on Google Cloud Dataproc. The operator will wait
+ until the WorkflowTemplate is finished executing.
+
+ .. seealso::
+ Please refer to:
+ https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiate
+
+ :param template_id: The id of the template. (templated)
+ :type template_id: str
+ :param project_id: The ID of the google cloud project in which
+ the template runs
+ :type project_id: str
+ :param region: leave as 'global', might become relevant in the future
+ :type region: str
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: str
+ :param parameters: a map of parameters for Dataproc Template in key-value format:
+ map (key: string, value: string)
+ Example: { "date_from": "2019-08-01", "date_to": "2019-08-02"}.
+ Values may not exceed 100 characters. Please refer to:
+ https://cloud.google.com/dataproc/docs/concepts/workflows/workflow-parameters
+ :type parameters: Dict[str, str]
+ :param request_id: Optional. A unique id used to identify the request. If the server receives two
+ ``SubmitJobRequest`` requests with the same id, then the second request will be ignored and the first
+ ``Job`` created and stored in the backend is returned.
+ It is recommended to always set this value to a UUID.
+ :type request_id: str
+ :param parameters: Optional. Map from parameter names to values that should be used for those
+ parameters. Values may not exceed 100 characters.
+ :type parameters: Dict[str, str]
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: str
+ """
+
+ template_fields = ['template_id']
+
+ @apply_defaults
+ def __init__( # pylint: disable=too-many-arguments
+ self,
+ template_id: str,
+ region: str,
+ project_id: Optional[str] = None,
+ version: Optional[int] = None,
+ request_id: Optional[str] = None,
+ parameters: Optional[Dict[str, str]] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ *args,
+ **kwargs
+ ) -> None:
+ super().__init__(*args, **kwargs)
+
+ self.template_id = template_id
+ self.parameters = parameters
+ self.version = version
+ self.project_id = project_id
+ self.region = region
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.request_id = request_id
+ self.gcp_conn_id = gcp_conn_id
+
+ def execute(self, context):
+ hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
+ self.log.info('Instantiating template %s', self.template_id)
+ operation = hook.instantiate_workflow_template(
+ project_id=self.project_id,
+ location=self.region,
+ template_name=self.template_id,
+ version=self.version,
+ request_id=self.request_id,
+ parameters=self.parameters,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ operation.result()
+ self.log.info('Template instantiated.')
+
+
+class DataprocInstantiateInlineWorkflowTemplateOperator(BaseOperator):
+ """
+ Instantiate a WorkflowTemplate Inline on Google Cloud Dataproc. The operator will
+ wait until the WorkflowTemplate is finished executing.
+
+ .. seealso::
+ Please refer to:
+ https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiateInline
+
+ :param template: The template contents. (templated)
+ :type template: dict
+ :param project_id: The ID of the google cloud project in which
+ the template runs
+ :type project_id: str
+ :param region: leave as 'global', might become relevant in the future
+ :type region: str
+ :param parameters: a map of parameters for Dataproc Template in key-value format:
+ map (key: string, value: string)
+ Example: { "date_from": "2019-08-01", "date_to": "2019-08-02"}.
+ Values may not exceed 100 characters. Please refer to:
+ https://cloud.google.com/dataproc/docs/concepts/workflows/workflow-parameters
+ :type parameters: Dict[str, str]
+ :param request_id: Optional. A unique id used to identify the request. If the server receives two
+ ``SubmitJobRequest`` requests with the same id, then the second request will be ignored and the first
+ ``Job`` created and stored in the backend is returned.
+ It is recommended to always set this value to a UUID.
+ :type request_id: str
+ :param parameters: Optional. Map from parameter names to values that should be used for those
+ parameters. Values may not exceed 100 characters.
+ :type parameters: Dict[str, str]
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: str
+ """
+
+ template_fields = ['template']
+
+ @apply_defaults
+ def __init__(
+ self,
+ template: Dict,
+ region: str,
+ project_id: Optional[str] = None,
+ request_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ *args,
+ **kwargs
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.template = template
+ self.project_id = project_id
+ self.location = region
+ self.template = template
+ self.request_id = request_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+
+ def execute(self, context):
+ self.log.info('Instantiating Inline Template')
+ hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
+ operation = hook.instantiate_inline_workflow_template(
+ template=self.template,
+ project_id=self.project_id,
+ location=self.location,
+ request_id=self.request_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ operation.result()
+ self.log.info('Template instantiated.')
+
+
+class DataprocSubmitJobOperator(BaseOperator):
+ """
+ Submits a job to a cluster.
+
+ :param project_id: Required. The ID of the Google Cloud Platform project that the job belongs to.
+ :type project_id: str
+ :param location: Required. The Cloud Dataproc region in which to handle the request.
+ :type location: str
+ :param job: Required. The job resource.
+ If a dict is provided, it must be of the same form as the protobuf message
+ :class:`~google.cloud.dataproc_v1beta2.types.Job`
+ :type job: Dict
+ :param request_id: Optional. A unique id used to identify the request. If the server receives two
+ ``SubmitJobRequest`` requests with the same id, then the second request will be ignored and the first
+ ``Job`` created and stored in the backend is returned.
+ It is recommended to always set this value to a UUID.
+ :type request_id: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ :param gcp_conn_id:
+ :type gcp_conn_id: str
+ """
+
+ template_fields = ('project_id', 'location', 'job')
+
+ @apply_defaults
+ def __init__(
+ self,
+ project_id: str,
+ location: str,
+ job: Dict,
+ request_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ *args,
+ **kwargs
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.location = location
+ self.job = job
+ self.request_id = request_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+
+ def execute(self, context: Dict):
+ self.log.info("Submitting job")
+ hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
+ job_object = hook.submit_job(
+ project_id=self.project_id,
+ location=self.location,
+ job=self.job,
+ request_id=self.request_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ job_id = job_object.reference.job_id
+ self.log.info("Waiting for job %s to complete", job_id)
+ hook.wait_for_job(
+ job_id=job_id,
+ project_id=self.project_id,
+ location=self.location
+ )
+ self.log.info("Job completed successfully.")
+
+
+class DataprocUpdateClusterOperator(BaseOperator):
+ """
+ Updates a cluster in a project.
+
+ :param project_id: Required. The ID of the Google Cloud Platform project the cluster belongs to.
+ :type project_id: str
+ :param location: Required. The Cloud Dataproc region in which to handle the request.
+ :type location: str
+ :param cluster_name: Required. The cluster name.
+ :type cluster_name: str
+ :param cluster: Required. The changes to the cluster.
+
+ If a dict is provided, it must be of the same form as the protobuf message
+ :class:`~google.cloud.dataproc_v1beta2.types.Cluster`
+ :type cluster: Union[Dict, google.cloud.dataproc_v1beta2.types.Cluster]
+ :param update_mask: Required. Specifies the path, relative to ``Cluster``, of the field to update. For
+ example, to change the number of workers in a cluster to 5, the ``update_mask`` parameter would be
+ specified as ``config.worker_config.num_instances``, and the ``PATCH`` request body would specify the
+ new value. If a dict is provided, it must be of the same form as the protobuf message
+ :class:`~google.cloud.dataproc_v1beta2.types.FieldMask`
+ :type update_mask: Union[Dict, google.cloud.dataproc_v1beta2.types.FieldMask]
+ :param graceful_decommission_timeout: Optional. Timeout for graceful YARN decomissioning. Graceful
+ decommissioning allows removing nodes from the cluster without interrupting jobs in progress. Timeout
+ specifies how long to wait for jobs in progress to finish before forcefully removing nodes (and
+ potentially interrupting jobs). Default timeout is 0 (for forceful decommission), and the maximum
+ allowed timeout is 1 day.
+ :type graceful_decommission_timeout: Union[Dict, google.cloud.dataproc_v1beta2.types.Duration]
+ :param request_id: Optional. A unique id used to identify the request. If the server receives two
+ ``UpdateClusterRequest`` requests with the same id, then the second request will be ignored and the
+ first ``google.longrunning.Operation`` created and stored in the backend is returned.
+ :type request_id: str
+ :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: str
+ """
+
+ @apply_defaults
+ def __init__( # pylint: disable=too-many-arguments
+ self,
+ location: str,
+ cluster_name: str,
+ cluster: Union[Dict, Cluster],
+ update_mask: Union[Dict, FieldMask],
+ graceful_decommission_timeout: Union[Dict, Duration],
+ request_id: Optional[str] = None,
+ project_id: Optional[str] = None,
+ retry: Retry = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ *args,
+ **kwargs
+ ):
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.location = location
+ self.cluster_name = cluster_name
+ self.cluster = cluster
+ self.update_mask = update_mask
+ self.graceful_decommission_timeout = graceful_decommission_timeout
+ self.request_id = request_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+
+ def execute(self, context: Dict):
+ hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
+ self.log.info("Updating %s cluster.", self.cluster_name)
+ operation = hook.update_cluster(
+ project_id=self.project_id,
+ location=self.location,
+ cluster_name=self.cluster_name,
+ cluster=self.cluster,
+ update_mask=self.update_mask,
+ graceful_decommission_timeout=self.graceful_decommission_timeout,
+ request_id=self.request_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ operation.result()
+ self.log.info("Updated %s cluster.", self.cluster_name)