You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by al...@apache.org on 2017/01/09 20:49:14 UTC

incubator-airflow git commit: [AIRFLOW-729] Add Google Cloud Dataproc cluster creation operator

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 617ba7412 -> ffbe7282d


[AIRFLOW-729] Add Google Cloud Dataproc cluster creation operator

The operator checks if there is already a cluster
running with the provided name in the provided
project.
If so, the operator finishes successfully.
Otherwise, the operator issues a rest API call to
initiate
the cluster creation and waits until the creation
is successful before exiting.

Closes #1971 from
bodschut/feature/dataproc_operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ffbe7282
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ffbe7282
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ffbe7282

Branch: refs/heads/master
Commit: ffbe7282dcff5d5dd1c23ab0eff27dab2bd457f6
Parents: 617ba74
Author: Bob De Schutter <de...@gmail.com>
Authored: Mon Jan 9 21:49:06 2017 +0100
Committer: Alex Van Boxel <al...@vanboxel.be>
Committed: Mon Jan 9 21:49:06 2017 +0100

----------------------------------------------------------------------
 airflow/contrib/operators/dataproc_operator.py | 253 ++++++++++++++++++++
 1 file changed, 253 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ffbe7282/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index a3df381..9cf2bbe 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -12,9 +12,262 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
+import logging
+import time
+
 from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
+from googleapiclient.errors import HttpError
+
+
+class DataprocClusterCreateOperator(BaseOperator):
+    """
+    Create a new cluster on Google Cloud Dataproc. The operator will wait until the
+    creation is successful or an error occurs in the creation process.
+
+    The parameters allow to configure the cluster. Please refer to
+
+    https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters
+
+    for a detailed explanation on the different parameters. Most of the configuration
+    parameters detailed in the link are available as a parameter to this operator.
+    """
+
+    template_fields = ['cluster_name',]
+
+    @apply_defaults
+    def __init__(self,
+                 cluster_name,
+                 project_id,
+                 num_workers,
+                 zone,
+                 storage_bucket=None,
+                 init_actions_uris=None,
+                 metadata=None,
+                 properties=None,
+                 master_machine_type='n1-standard-4',
+                 master_disk_size=500,
+                 worker_machine_type='n1-standard-4',
+                 worker_disk_size=500,
+                 num_preemptible_workers=0,
+                 labels=None,
+                 region='global',
+                 google_cloud_conn_id='google_cloud_default',
+                 delegate_to=None,
+                 *args,
+                 **kwargs):
+        """
+        Create a new DataprocClusterCreateOperator.
+
+        For more info on the creation of a cluster through the API, have a look at:
+
+        https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters
+
+        :param cluster_name: The name of the cluster to create
+        :type cluster_name: string
+        :param project_id: The ID of the google cloud project in which
+            to create the cluster
+        :type project_id: string
+        :param num_workers: The # of workers to spin up
+        :type num_workers: int
+        :param storage_bucket: The storage bucket to use, setting to None lets dataproc
+            generate a custom one for you
+        :type storage_bucket: string
+        :param init_actions_uris: List of GCS uri's containing
+            dataproc initialization scripts
+        :type init_actions_uris: list[string]
+        :param metadata: dict of key-value google compute engine metadata entries
+            to add to all instances
+        :type metadata: dict
+        :param properties: dict of properties to set on
+            config files (e.g. spark-defaults.conf), see
+            https://cloud.google.com/dataproc/docs/reference/rest/v1/ \
+            projects.regions.clusters#SoftwareConfig
+        :type properties: dict
+        :param master_machine_type: Compute engine machine type to use for the master node
+        :type master_machine_type: string
+        :param master_disk_size: Disk size for the master node
+        :type int
+        :param worker_machine_type:Compute engine machine type to use for the worker nodes
+        :type worker_machine_type: string
+        :param worker_disk_size: Disk size for the worker nodes
+        :type worker_disk_size: int
+        :param num_preemptible_workers: The # of preemptible worker nodes to spin up
+        :type num_preemptible_workers: int
+        :param labels: dict of labels to add to the cluster
+        :type labels: dict
+        :param zone: The zone where the cluster will be located
+        :type zone: string
+        :param region: leave as 'global', might become relevant in the future
+        :param google_cloud_conn_id: The connection id to use when connecting to dataproc
+        :type google_cloud_conn_id: string
+        :param delegate_to: The account to impersonate, if any.
+            For this to work, the service account making the request must have domain-wide
+            delegation enabled.
+        :type delegate_to: string
+        """
+        super(DataprocClusterCreateOperator, self).__init__(*args, **kwargs)
+        self.google_cloud_conn_id = google_cloud_conn_id
+        self.delegate_to = delegate_to
+        self.cluster_name = cluster_name
+        self.project_id = project_id
+        self.num_workers = num_workers
+        self.num_preemptible_workers = num_preemptible_workers
+        self.storage_bucket = storage_bucket
+        self.init_actions_uris = init_actions_uris
+        self.metadata = metadata
+        self.properties = properties
+        self.master_machine_type = master_machine_type
+        self.master_disk_size = master_disk_size
+        self.worker_machine_type = worker_machine_type
+        self.worker_disk_size = worker_disk_size
+        self.labels = labels
+        self.zone = zone
+        self.region = region
+
+    def _get_cluster_list_for_project(self, service):
+        result = service.projects().regions().clusters().list(
+            projectId=self.project_id,
+            region=self.region
+        ).execute()
+        return result.get('clusters', [])
+
+    def _get_cluster(self, service):
+        cluster_list = self._get_cluster_list_for_project(service)
+        cluster = [c for c in cluster_list if c['clusterName'] == self.cluster_name]
+        if cluster:
+            return cluster[0]
+        return None
+
+    def _get_cluster_state(self, service):
+        cluster = self._get_cluster(service)
+        if 'status' in cluster:
+            return cluster['status']['state']
+        else:
+            return None
+
+    def _cluster_ready(self, state, service):
+        if state == 'RUNNING':
+            return True
+        if state == 'ERROR':
+            cluster = self._get_cluster(service)
+            try:
+                error_details = cluster['status']['details']
+            except KeyError:
+                error_details = 'Unknown error in cluster creation, ' \
+                                'check Google Cloud console for details.'
+            raise Exception(error_details)
+        return False
+
+    def _wait_for_done(self, service):
+        while True:
+            state = self._get_cluster_state(service)
+            if state is None:
+                logging.info("No state for cluster '%s'", self.cluster_name)
+                time.sleep(15)
+            else:
+                logging.info("State for cluster '%s' is %s", self.cluster_name, state)
+                if self._cluster_ready(state, service):
+                    logging.info("Cluster '%s' successfully created",
+                                 self.cluster_name)
+                    return
+                time.sleep(15)
+
+    def execute(self, context):
+        hook = DataProcHook(
+            gcp_conn_id=self.google_cloud_conn_id,
+            delegate_to=self.delegate_to
+        )
+        service = hook.get_conn()
+
+        if self._get_cluster(service):
+            logging.info('Cluster {} already exists... Checking status...'.format(
+                            self.cluster_name
+                        ))
+            self._wait_for_done(service)
+            return True
+
+        zone_uri = \
+            'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
+                self.project_id, self.zone
+            )
+        master_type_uri = \
+            "https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}".format(
+                self.project_id, self.zone, self.master_machine_type
+            )
+        worker_type_uri = \
+            "https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}".format(
+                self.project_id, self.zone, self.worker_machine_type
+            )
+        cluster_data = {
+            'projectId': self.project_id,
+            'clusterName': self.cluster_name,
+            'config': {
+                'gceClusterConfig': {
+                    'zoneUri': zone_uri
+                },
+                'masterConfig': {
+                    'numInstances': 1,
+                    'machineTypeUri': master_type_uri,
+                    'diskConfig': {
+                        'bootDiskSizeGb': self.master_disk_size
+                    }
+                },
+                'workerConfig': {
+                    'numInstances': self.num_workers,
+                    'machineTypeUri': worker_type_uri,
+                    'diskConfig': {
+                        'bootDiskSizeGb': self.worker_disk_size
+                    }
+                },
+                'secondaryWorkerConfig': {},
+                'softwareConfig': {}
+            }
+        }
+        if self.num_preemptible_workers > 0:
+            cluster_data['config']['secondaryWorkerConfig'] = {
+                'numInstances': self.num_preemptible_workers,
+                'machineTypeUri': worker_type_uri,
+                'diskConfig': {
+                    'bootDiskSizeGb': self.worker_disk_size
+                },
+                'isPreemptible': True
+            }
+        if self.labels:
+            cluster_data['labels'] = self.labels
+        if self.storage_bucket:
+            cluster_data['config']['configBucket'] = self.storage_bucket
+        if self.metadata:
+            cluster_data['config']['gceClusterConfig']['metadata'] = self.metadata
+        if self.properties:
+            cluster_data['config']['softwareConfig']['properties'] = self.properties
+        if self.init_actions_uris:
+            init_actions_dict = [
+                {'executableFile': uri} for uri in self.init_actions_uris
+            ]
+            cluster_data['config']['initializationActions'] = init_actions_dict
+
+        try:
+            service.projects().regions().clusters().create(
+                projectId=self.project_id,
+                region=self.region,
+                body=cluster_data
+            ).execute()
+        except HttpError as e:
+            # probably two cluster start commands at the same time
+            time.sleep(10)
+            if self._get_cluster(service):
+                logging.info('Cluster {} already exists... Checking status...'.format(
+                             self.cluster_name
+                             ))
+                self._wait_for_done(service)
+                return True
+            else:
+                raise e
+
+        self._wait_for_done(service)
 
 
 class DataProcPigOperator(BaseOperator):