You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2018/01/24 15:24:04 UTC

incubator-airflow git commit: [AIRFLOW-2016] Add support for Dataproc Workflow Templates

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 18d09a948 -> 0990ba8c0


[AIRFLOW-2016] Add support for Dataproc Workflow Templates

Closes #2958 from DanSedov/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0990ba8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0990ba8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0990ba8c

Branch: refs/heads/master
Commit: 0990ba8c027a6d438a207c6afc202a7e40ebbc7f
Parents: 18d09a9
Author: Dan Sedov <se...@google.com>
Authored: Wed Jan 24 07:23:48 2018 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Wed Jan 24 07:23:48 2018 -0800

----------------------------------------------------------------------
 airflow/contrib/hooks/gcp_dataproc_hook.py      |  68 ++++++++--
 airflow/contrib/operators/dataproc_operator.py  | 115 +++++++++++++++-
 .../contrib/operators/test_dataproc_operator.py | 134 ++++++++++++++++++-
 3 files changed, 301 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0990ba8c/airflow/contrib/hooks/gcp_dataproc_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py
index 5b96484..bc5aa83 100644
--- a/airflow/contrib/hooks/gcp_dataproc_hook.py
+++ b/airflow/contrib/hooks/gcp_dataproc_hook.py
@@ -141,24 +141,76 @@ class _DataProcJobBuilder:
         return self.job
 
 
+class _DataProcOperation(LoggingMixin):
+    """Continuously polls Dataproc Operation until it completes."""
+    def __init__(self, dataproc_api, operation):
+        self.dataproc_api = dataproc_api
+        self.operation = operation
+        self.operation_name = self.operation['name']
+
+    def wait_for_done(self):
+        if self._check_done():
+            return True
+
+        self.log.info(
+            'Waiting for Dataproc Operation %s to finish', self.operation_name)
+        while True:
+            time.sleep(10)
+            self.operation = (
+                self.dataproc_api.projects()
+                .regions()
+                .operations()
+                .get(name=self.operation_name)
+                .execute(num_retries=5))
+
+            if self._check_done():
+                return True
+
+    def get(self):
+        return self.operation
+
+    def _check_done(self):
+        if 'done' in self.operation:
+            if 'error' in self.operation:
+                self.log.warning(
+                    'Dataproc Operation %s failed with error: %s',
+                    self.operation_name, self.operation['error']['message'])
+                self._raise_error()
+            else:
+                self.log.info(
+                    'Dataproc Operation %s done', self.operation['name'])
+                return True
+        return False
+
+    def _raise_error(self):
+        raise Exception('Google Dataproc Operation %s failed: %s' %
+                        (self.operation_name, self.operation['error']['message']))
+
+
 class DataProcHook(GoogleCloudBaseHook):
+    """Hook for Google Cloud Dataproc APIs."""
     def __init__(self,
                  gcp_conn_id='google_cloud_default',
-                 delegate_to=None):
+                 delegate_to=None,
+                 api_version='v1'):
         super(DataProcHook, self).__init__(gcp_conn_id, delegate_to)
+        self.api_version = api_version
 
     def get_conn(self):
-        """
-        Returns a Google Cloud DataProc service object.
-        """
+        """Returns a Google Cloud Dataproc service object."""
         http_authorized = self._authorize()
-        return build('dataproc', 'v1', http=http_authorized)
+        return build('dataproc', self.api_version, http=http_authorized)
 
     def submit(self, project_id, job, region='global'):
         submitted = _DataProcJob(self.get_conn(), project_id, job, region)
         if not submitted.wait_for_done():
-            submitted.raise_error("DataProcTask has errors")
+            submitted.raise_error('DataProcTask has errors')
 
     def create_job_template(self, task_id, cluster_name, job_type, properties):
-        return _DataProcJobBuilder(self.project_id, task_id, cluster_name, job_type,
-                                   properties)
+        return _DataProcJobBuilder(self.project_id, task_id, cluster_name,
+                                   job_type, properties)
+
+    def await(self, operation):
+        """Awaits for Google Cloud Dataproc Operation to complete."""
+        submitted = _DataProcOperation(self.get_conn(), operation)
+        submitted.wait_for_done()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0990ba8c/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 3b10382..ece6d51 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -14,8 +14,11 @@
 #
 
 import time
+import uuid
+
 
 from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook
+from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.version import version
@@ -92,7 +95,7 @@ class DataprocClusterCreateOperator(BaseOperator):
     :type service_account_scopes: list[string]
     """
 
-    template_fields = ['cluster_name', ]
+    template_fields = ['cluster_name']
 
     @apply_defaults
     def __init__(self,
@@ -928,3 +931,113 @@ class DataProcPySparkOperator(BaseOperator):
         job.set_job_name(self.job_name)
 
         hook.submit(hook.project_id, job.build(), self.region)
+
+
+class DataprocWorkflowTemplateBaseOperator(BaseOperator):
+    template_fields = ['template_id', 'template']
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 region='global',
+                 gcp_conn_id='google_cloud_default',
+                 delegate_to=None,
+                 *args,
+                 **kwargs):
+        super(DataprocWorkflowTemplateBaseOperator, self).__init__(*args, **kwargs)
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.project_id = project_id
+        self.region = region
+        self.hook = DataProcHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            api_version='v1beta2'
+        )
+
+    def execute(self, context):
+        self.hook.await(self.start())
+
+    def start(self, context):
+        raise AirflowException('plese start a workflow operation')
+
+
+class DataprocWorkflowTemplateInstantiateOperator(DataprocWorkflowTemplateBaseOperator):
+    """
+    Instantiate a WorkflowTemplate on Google Cloud Dataproc. The operator will wait
+    until the WorkflowTemplate is finished executing.
+
+    Please refer to:
+    https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiate
+
+    :param template_id: The id of the template.
+    :type template_id: string
+    :param project_id: The ID of the google cloud project in which
+        the template runs
+    :type project_id: string
+    :param region: leave as 'global', might become relevant in the future
+    :type region: string
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+    :type gcp_conn_id: string
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+        :type delegate_to: string
+    """
+
+    @apply_defaults
+    def __init__(self, template_id, *args, **kwargs):
+        (super(DataprocWorkflowTemplateInstantiateOperator, self)
+            .__init__(*args, **kwargs))
+        self.template_id = template_id
+
+    def start(self):
+        self.log.info('Instantiating Template: %s', self.template_id)
+        return (
+            self.hook.get_conn().projects().regions().workflowTemplates()
+            .instantiate(
+                name=('projects/%s/regions/%s/workflowTemplates/%s' %
+                      (self.project_id, self.region, self.template_id)),
+                body={'instanceId': str(uuid.uuid1())})
+            .execute())
+
+
+class DataprocWorkflowTemplateInstantiateInlineOperator(
+        DataprocWorkflowTemplateBaseOperator):
+    """
+    Instantiate a WorkflowTemplate Inline on Google Cloud Dataproc. The operator will
+    wait until the WorkflowTemplate is finished executing.
+
+    Please refer to:
+    https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiateInline
+
+    :param template: The template contents.
+    :type template: map
+    :param project_id: The ID of the google cloud project in which
+        the template runs
+    :type project_id: string
+    :param region: leave as 'global', might become relevant in the future
+    :type region: string
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+    :type gcp_conn_id: string
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: string
+    """
+
+    @apply_defaults
+    def __init__(self, template, *args, **kwargs):
+        (super(DataprocWorkflowTemplateInstantiateInlineOperator, self)
+            .__init__(*args, **kwargs))
+        self.template = template
+
+    def start(self):
+        self.log.info('Instantiating Inline Template')
+        return (
+            self.hook.get_conn().projects().regions().workflowTemplates()
+            .instantiateInline(
+                parent='projects/%s/regions/%s' % (self.project_id, self.region),
+                instanceId=str(uuid.uuid1()),
+                body=self.template)
+            .execute())

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0990ba8c/tests/contrib/operators/test_dataproc_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py
index 2df056a..448da86 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -18,12 +18,15 @@ import re
 import unittest
 
 from airflow import DAG
-from airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator
-from airflow.contrib.operators.dataproc_operator import DataprocClusterDeleteOperator
-from airflow.contrib.operators.dataproc_operator import DataProcHadoopOperator
-from airflow.contrib.operators.dataproc_operator import DataProcHiveOperator
-from airflow.contrib.operators.dataproc_operator import DataProcPySparkOperator
-from airflow.contrib.operators.dataproc_operator import DataProcSparkOperator
+from airflow.contrib.operators.dataproc_operator import \
+    DataprocClusterCreateOperator,\
+    DataprocClusterDeleteOperator,\
+    DataProcHadoopOperator,\
+    DataProcHiveOperator,\
+    DataProcPySparkOperator,\
+    DataProcSparkOperator,\
+    DataprocWorkflowTemplateInstantiateInlineOperator,\
+    DataprocWorkflowTemplateInstantiateOperator
 from airflow.version import version
 
 from copy import deepcopy
@@ -55,7 +58,7 @@ WORKER_MACHINE_TYPE = 'n1-standard-2'
 WORKER_DISK_SIZE = 100
 NUM_PREEMPTIBLE_WORKERS = 2
 LABEL1 = {}
-LABEL2 = {'application':'test', 'year': 2017}
+LABEL2 = {'application': 'test', 'year': 2017}
 SERVICE_ACCOUNT_SCOPES = [
     'https://www.googleapis.com/auth/bigquery',
     'https://www.googleapis.com/auth/bigtable.data'
@@ -63,6 +66,10 @@ SERVICE_ACCOUNT_SCOPES = [
 DEFAULT_DATE = datetime.datetime(2017, 6, 6)
 REGION = 'test-region'
 MAIN_URI = 'test-uri'
+TEMPLATE_ID = 'template-id'
+
+HOOK = 'airflow.contrib.operators.dataproc_operator.DataProcHook'
+
 
 class DataprocClusterCreateOperatorTest(unittest.TestCase):
     # Unit test for the DataprocClusterCreateOperator
@@ -290,3 +297,116 @@ class DataProcSparkOperatorTest(unittest.TestCase):
 
             dataproc_task.execute(None)
             mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY, REGION)
+
+
+class DataprocWorkflowTemplateInstantiateOperatorTest(unittest.TestCase):
+    def setUp(self):
+        # Setup service.projects().regions().workflowTemplates().instantiate().execute()
+        self.operation = {'name': 'operation', 'done': True}
+        self.mock_execute = Mock()
+        self.mock_execute.execute.return_value = self.operation
+        self.mock_workflows = Mock()
+        self.mock_workflows.instantiate.return_value = self.mock_execute
+        self.mock_regions = Mock()
+        self.mock_regions.workflowTemplates.return_value = self.mock_workflows
+        self.mock_projects = Mock()
+        self.mock_projects.regions.return_value = self.mock_regions
+        self.mock_conn = Mock()
+        self.mock_conn.projects.return_value = self.mock_projects
+        self.dag = DAG(
+            'test_dag',
+            default_args={
+                'owner': 'airflow',
+                'start_date': DEFAULT_DATE,
+                'end_date': DEFAULT_DATE,
+            },
+            schedule_interval='@daily')
+
+    def test_workflow(self):
+        with patch(HOOK) as MockHook:
+            hook = MockHook()
+            hook.get_conn.return_value = self.mock_conn
+            hook.await.return_value = None
+
+            dataproc_task = DataprocWorkflowTemplateInstantiateOperator(
+                task_id=TASK_ID,
+                project_id=PROJECT_ID,
+                region=REGION,
+                template_id=TEMPLATE_ID,
+                dag=self.dag
+            )
+
+            dataproc_task.execute(None)
+            template_name = (
+                'projects/test-project-id/regions/test-region/'
+                'workflowTemplates/template-id')
+            self.mock_workflows.instantiate.assert_called_once_with(
+                name=template_name,
+                body=mock.ANY)
+            hook.await.assert_called_once_with(self.operation)
+
+
+class DataprocWorkflowTemplateInstantiateInlineOperatorTest(unittest.TestCase):
+    def setUp(self):
+        # Setup service.projects().regions().workflowTemplates().instantiateInline()
+        #              .execute()
+        self.operation = {'name': 'operation', 'done': True}
+        self.mock_execute = Mock()
+        self.mock_execute.execute.return_value = self.operation
+        self.mock_workflows = Mock()
+        self.mock_workflows.instantiateInline.return_value = self.mock_execute
+        self.mock_regions = Mock()
+        self.mock_regions.workflowTemplates.return_value = self.mock_workflows
+        self.mock_projects = Mock()
+        self.mock_projects.regions.return_value = self.mock_regions
+        self.mock_conn = Mock()
+        self.mock_conn.projects.return_value = self.mock_projects
+        self.dag = DAG(
+            'test_dag',
+            default_args={
+                'owner': 'airflow',
+                'start_date': DEFAULT_DATE,
+                'end_date': DEFAULT_DATE,
+            },
+            schedule_interval='@daily')
+
+    def test_iniline_workflow(self):
+        with patch(HOOK) as MockHook:
+            hook = MockHook()
+            hook.get_conn.return_value = self.mock_conn
+            hook.await.return_value = None
+
+            template = {
+                "placement": {
+                    "managed_cluster": {
+                        "cluster_name": CLUSTER_NAME,
+                        "config": {
+                            "gce_cluster_config": {
+                                "zone_uri": ZONE,
+                            }
+                        }
+                    }
+                },
+                "jobs": [
+                    {
+                        "step_id": "say-hello",
+                        "pig_job": {
+                            "query": "sh echo hello"
+                        }
+                    }],
+            }
+
+            dataproc_task = DataprocWorkflowTemplateInstantiateInlineOperator(
+                task_id=TASK_ID,
+                project_id=PROJECT_ID,
+                region=REGION,
+                template=template,
+                dag=self.dag
+            )
+
+            dataproc_task.execute(None)
+            self.mock_workflows.instantiateInline.assert_called_once_with(
+                parent='projects/test-project-id/regions/test-region',
+                instanceId=mock.ANY,
+                body=template)
+            hook.await.assert_called_once_with(self.operation)