You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by al...@apache.org on 2017/01/09 20:49:14 UTC
incubator-airflow git commit: [AIRFLOW-729] Add Google Cloud Dataproc
cluster creation operator
Repository: incubator-airflow
Updated Branches:
refs/heads/master 617ba7412 -> ffbe7282d
[AIRFLOW-729] Add Google Cloud Dataproc cluster creation operator
The operator checks if there is already a cluster
running with the provided name in the provided
project.
If so, the operator finishes successfully.
Otherwise, the operator issues a rest API call to
initiate
the cluster creation and waits until the creation
is successful before exiting.
Closes #1971 from
bodschut/feature/dataproc_operator
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ffbe7282
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ffbe7282
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ffbe7282
Branch: refs/heads/master
Commit: ffbe7282dcff5d5dd1c23ab0eff27dab2bd457f6
Parents: 617ba74
Author: Bob De Schutter <de...@gmail.com>
Authored: Mon Jan 9 21:49:06 2017 +0100
Committer: Alex Van Boxel <al...@vanboxel.be>
Committed: Mon Jan 9 21:49:06 2017 +0100
----------------------------------------------------------------------
airflow/contrib/operators/dataproc_operator.py | 253 ++++++++++++++++++++
1 file changed, 253 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ffbe7282/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index a3df381..9cf2bbe 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -12,9 +12,262 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
+import logging
+import time
+
from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
+from googleapiclient.errors import HttpError
+
+
+class DataprocClusterCreateOperator(BaseOperator):
+ """
+ Create a new cluster on Google Cloud Dataproc. The operator will wait until the
+ creation is successful or an error occurs in the creation process.
+
+ The parameters allow to configure the cluster. Please refer to
+
+ https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters
+
+ for a detailed explanation on the different parameters. Most of the configuration
+ parameters detailed in the link are available as a parameter to this operator.
+ """
+
+ template_fields = ['cluster_name',]
+
+ @apply_defaults
+ def __init__(self,
+ cluster_name,
+ project_id,
+ num_workers,
+ zone,
+ storage_bucket=None,
+ init_actions_uris=None,
+ metadata=None,
+ properties=None,
+ master_machine_type='n1-standard-4',
+ master_disk_size=500,
+ worker_machine_type='n1-standard-4',
+ worker_disk_size=500,
+ num_preemptible_workers=0,
+ labels=None,
+ region='global',
+ google_cloud_conn_id='google_cloud_default',
+ delegate_to=None,
+ *args,
+ **kwargs):
+ """
+ Create a new DataprocClusterCreateOperator.
+
+ For more info on the creation of a cluster through the API, have a look at:
+
+ https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters
+
+ :param cluster_name: The name of the cluster to create
+ :type cluster_name: string
+ :param project_id: The ID of the google cloud project in which
+ to create the cluster
+ :type project_id: string
+ :param num_workers: The # of workers to spin up
+ :type num_workers: int
+ :param storage_bucket: The storage bucket to use, setting to None lets dataproc
+ generate a custom one for you
+ :type storage_bucket: string
+ :param init_actions_uris: List of GCS uri's containing
+ dataproc initialization scripts
+ :type init_actions_uris: list[string]
+ :param metadata: dict of key-value google compute engine metadata entries
+ to add to all instances
+ :type metadata: dict
+ :param properties: dict of properties to set on
+ config files (e.g. spark-defaults.conf), see
+ https://cloud.google.com/dataproc/docs/reference/rest/v1/ \
+ projects.regions.clusters#SoftwareConfig
+ :type properties: dict
+ :param master_machine_type: Compute engine machine type to use for the master node
+ :type master_machine_type: string
+ :param master_disk_size: Disk size for the master node
+ :type int
+ :param worker_machine_type:Compute engine machine type to use for the worker nodes
+ :type worker_machine_type: string
+ :param worker_disk_size: Disk size for the worker nodes
+ :type worker_disk_size: int
+ :param num_preemptible_workers: The # of preemptible worker nodes to spin up
+ :type num_preemptible_workers: int
+ :param labels: dict of labels to add to the cluster
+ :type labels: dict
+ :param zone: The zone where the cluster will be located
+ :type zone: string
+ :param region: leave as 'global', might become relevant in the future
+ :param google_cloud_conn_id: The connection id to use when connecting to dataproc
+ :type google_cloud_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: string
+ """
+ super(DataprocClusterCreateOperator, self).__init__(*args, **kwargs)
+ self.google_cloud_conn_id = google_cloud_conn_id
+ self.delegate_to = delegate_to
+ self.cluster_name = cluster_name
+ self.project_id = project_id
+ self.num_workers = num_workers
+ self.num_preemptible_workers = num_preemptible_workers
+ self.storage_bucket = storage_bucket
+ self.init_actions_uris = init_actions_uris
+ self.metadata = metadata
+ self.properties = properties
+ self.master_machine_type = master_machine_type
+ self.master_disk_size = master_disk_size
+ self.worker_machine_type = worker_machine_type
+ self.worker_disk_size = worker_disk_size
+ self.labels = labels
+ self.zone = zone
+ self.region = region
+
+ def _get_cluster_list_for_project(self, service):
+ result = service.projects().regions().clusters().list(
+ projectId=self.project_id,
+ region=self.region
+ ).execute()
+ return result.get('clusters', [])
+
+ def _get_cluster(self, service):
+ cluster_list = self._get_cluster_list_for_project(service)
+ cluster = [c for c in cluster_list if c['clusterName'] == self.cluster_name]
+ if cluster:
+ return cluster[0]
+ return None
+
+ def _get_cluster_state(self, service):
+ cluster = self._get_cluster(service)
+ if 'status' in cluster:
+ return cluster['status']['state']
+ else:
+ return None
+
+ def _cluster_ready(self, state, service):
+ if state == 'RUNNING':
+ return True
+ if state == 'ERROR':
+ cluster = self._get_cluster(service)
+ try:
+ error_details = cluster['status']['details']
+ except KeyError:
+ error_details = 'Unknown error in cluster creation, ' \
+ 'check Google Cloud console for details.'
+ raise Exception(error_details)
+ return False
+
+ def _wait_for_done(self, service):
+ while True:
+ state = self._get_cluster_state(service)
+ if state is None:
+ logging.info("No state for cluster '%s'", self.cluster_name)
+ time.sleep(15)
+ else:
+ logging.info("State for cluster '%s' is %s", self.cluster_name, state)
+ if self._cluster_ready(state, service):
+ logging.info("Cluster '%s' successfully created",
+ self.cluster_name)
+ return
+ time.sleep(15)
+
+ def execute(self, context):
+ hook = DataProcHook(
+ gcp_conn_id=self.google_cloud_conn_id,
+ delegate_to=self.delegate_to
+ )
+ service = hook.get_conn()
+
+ if self._get_cluster(service):
+ logging.info('Cluster {} already exists... Checking status...'.format(
+ self.cluster_name
+ ))
+ self._wait_for_done(service)
+ return True
+
+ zone_uri = \
+ 'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
+ self.project_id, self.zone
+ )
+ master_type_uri = \
+ "https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}".format(
+ self.project_id, self.zone, self.master_machine_type
+ )
+ worker_type_uri = \
+ "https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}".format(
+ self.project_id, self.zone, self.worker_machine_type
+ )
+ cluster_data = {
+ 'projectId': self.project_id,
+ 'clusterName': self.cluster_name,
+ 'config': {
+ 'gceClusterConfig': {
+ 'zoneUri': zone_uri
+ },
+ 'masterConfig': {
+ 'numInstances': 1,
+ 'machineTypeUri': master_type_uri,
+ 'diskConfig': {
+ 'bootDiskSizeGb': self.master_disk_size
+ }
+ },
+ 'workerConfig': {
+ 'numInstances': self.num_workers,
+ 'machineTypeUri': worker_type_uri,
+ 'diskConfig': {
+ 'bootDiskSizeGb': self.worker_disk_size
+ }
+ },
+ 'secondaryWorkerConfig': {},
+ 'softwareConfig': {}
+ }
+ }
+ if self.num_preemptible_workers > 0:
+ cluster_data['config']['secondaryWorkerConfig'] = {
+ 'numInstances': self.num_preemptible_workers,
+ 'machineTypeUri': worker_type_uri,
+ 'diskConfig': {
+ 'bootDiskSizeGb': self.worker_disk_size
+ },
+ 'isPreemptible': True
+ }
+ if self.labels:
+ cluster_data['labels'] = self.labels
+ if self.storage_bucket:
+ cluster_data['config']['configBucket'] = self.storage_bucket
+ if self.metadata:
+ cluster_data['config']['gceClusterConfig']['metadata'] = self.metadata
+ if self.properties:
+ cluster_data['config']['softwareConfig']['properties'] = self.properties
+ if self.init_actions_uris:
+ init_actions_dict = [
+ {'executableFile': uri} for uri in self.init_actions_uris
+ ]
+ cluster_data['config']['initializationActions'] = init_actions_dict
+
+ try:
+ service.projects().regions().clusters().create(
+ projectId=self.project_id,
+ region=self.region,
+ body=cluster_data
+ ).execute()
+ except HttpError as e:
+ # probably two cluster start commands at the same time
+ time.sleep(10)
+ if self._get_cluster(service):
+ logging.info('Cluster {} already exists... Checking status...'.format(
+ self.cluster_name
+ ))
+ self._wait_for_done(service)
+ return True
+ else:
+ raise e
+
+ self._wait_for_done(service)
class DataProcPigOperator(BaseOperator):