You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2018/01/24 15:24:04 UTC
incubator-airflow git commit: [AIRFLOW-2016] Add support for Dataproc
Workflow Templates
Repository: incubator-airflow
Updated Branches:
refs/heads/master 18d09a948 -> 0990ba8c0
[AIRFLOW-2016] Add support for Dataproc Workflow Templates
Closes #2958 from DanSedov/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0990ba8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0990ba8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0990ba8c
Branch: refs/heads/master
Commit: 0990ba8c027a6d438a207c6afc202a7e40ebbc7f
Parents: 18d09a9
Author: Dan Sedov <se...@google.com>
Authored: Wed Jan 24 07:23:48 2018 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Wed Jan 24 07:23:48 2018 -0800
----------------------------------------------------------------------
airflow/contrib/hooks/gcp_dataproc_hook.py | 68 ++++++++--
airflow/contrib/operators/dataproc_operator.py | 115 +++++++++++++++-
.../contrib/operators/test_dataproc_operator.py | 134 ++++++++++++++++++-
3 files changed, 301 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0990ba8c/airflow/contrib/hooks/gcp_dataproc_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py
index 5b96484..bc5aa83 100644
--- a/airflow/contrib/hooks/gcp_dataproc_hook.py
+++ b/airflow/contrib/hooks/gcp_dataproc_hook.py
@@ -141,24 +141,76 @@ class _DataProcJobBuilder:
return self.job
+class _DataProcOperation(LoggingMixin):
+ """Continuously polls Dataproc Operation until it completes."""
+ def __init__(self, dataproc_api, operation):
+ self.dataproc_api = dataproc_api
+ self.operation = operation
+ self.operation_name = self.operation['name']
+
+ def wait_for_done(self):
+ if self._check_done():
+ return True
+
+ self.log.info(
+ 'Waiting for Dataproc Operation %s to finish', self.operation_name)
+ while True:
+ time.sleep(10)
+ self.operation = (
+ self.dataproc_api.projects()
+ .regions()
+ .operations()
+ .get(name=self.operation_name)
+ .execute(num_retries=5))
+
+ if self._check_done():
+ return True
+
+ def get(self):
+ return self.operation
+
+ def _check_done(self):
+ if 'done' in self.operation:
+ if 'error' in self.operation:
+ self.log.warning(
+ 'Dataproc Operation %s failed with error: %s',
+ self.operation_name, self.operation['error']['message'])
+ self._raise_error()
+ else:
+ self.log.info(
+ 'Dataproc Operation %s done', self.operation['name'])
+ return True
+ return False
+
+ def _raise_error(self):
+ raise Exception('Google Dataproc Operation %s failed: %s' %
+ (self.operation_name, self.operation['error']['message']))
+
+
class DataProcHook(GoogleCloudBaseHook):
+ """Hook for Google Cloud Dataproc APIs."""
def __init__(self,
gcp_conn_id='google_cloud_default',
- delegate_to=None):
+ delegate_to=None,
+ api_version='v1'):
super(DataProcHook, self).__init__(gcp_conn_id, delegate_to)
+ self.api_version = api_version
def get_conn(self):
- """
- Returns a Google Cloud DataProc service object.
- """
+ """Returns a Google Cloud Dataproc service object."""
http_authorized = self._authorize()
- return build('dataproc', 'v1', http=http_authorized)
+ return build('dataproc', self.api_version, http=http_authorized)
def submit(self, project_id, job, region='global'):
submitted = _DataProcJob(self.get_conn(), project_id, job, region)
if not submitted.wait_for_done():
- submitted.raise_error("DataProcTask has errors")
+ submitted.raise_error('DataProcTask has errors')
def create_job_template(self, task_id, cluster_name, job_type, properties):
- return _DataProcJobBuilder(self.project_id, task_id, cluster_name, job_type,
- properties)
+ return _DataProcJobBuilder(self.project_id, task_id, cluster_name,
+ job_type, properties)
+
+ def await(self, operation):
+ """Awaits for Google Cloud Dataproc Operation to complete."""
+ submitted = _DataProcOperation(self.get_conn(), operation)
+ submitted.wait_for_done()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0990ba8c/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 3b10382..ece6d51 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -14,8 +14,11 @@
#
import time
+import uuid
+
from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook
+from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.version import version
@@ -92,7 +95,7 @@ class DataprocClusterCreateOperator(BaseOperator):
:type service_account_scopes: list[string]
"""
- template_fields = ['cluster_name', ]
+ template_fields = ['cluster_name']
@apply_defaults
def __init__(self,
@@ -928,3 +931,113 @@ class DataProcPySparkOperator(BaseOperator):
job.set_job_name(self.job_name)
hook.submit(hook.project_id, job.build(), self.region)
+
+
+class DataprocWorkflowTemplateBaseOperator(BaseOperator):
+ template_fields = ['template_id', 'template']
+
+ @apply_defaults
+ def __init__(self,
+ project_id,
+ region='global',
+ gcp_conn_id='google_cloud_default',
+ delegate_to=None,
+ *args,
+ **kwargs):
+ super(DataprocWorkflowTemplateBaseOperator, self).__init__(*args, **kwargs)
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.project_id = project_id
+ self.region = region
+ self.hook = DataProcHook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ api_version='v1beta2'
+ )
+
+ def execute(self, context):
+ self.hook.await(self.start())
+
+ def start(self, context):
+ raise AirflowException('plese start a workflow operation')
+
+
+class DataprocWorkflowTemplateInstantiateOperator(DataprocWorkflowTemplateBaseOperator):
+ """
+ Instantiate a WorkflowTemplate on Google Cloud Dataproc. The operator will wait
+ until the WorkflowTemplate is finished executing.
+
+ Please refer to:
+ https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiate
+
+ :param template_id: The id of the template.
+ :type template_id: string
+ :param project_id: The ID of the google cloud project in which
+ the template runs
+ :type project_id: string
+ :param region: leave as 'global', might become relevant in the future
+ :type region: string
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: string
+ """
+
+ @apply_defaults
+ def __init__(self, template_id, *args, **kwargs):
+ (super(DataprocWorkflowTemplateInstantiateOperator, self)
+ .__init__(*args, **kwargs))
+ self.template_id = template_id
+
+ def start(self):
+ self.log.info('Instantiating Template: %s', self.template_id)
+ return (
+ self.hook.get_conn().projects().regions().workflowTemplates()
+ .instantiate(
+ name=('projects/%s/regions/%s/workflowTemplates/%s' %
+ (self.project_id, self.region, self.template_id)),
+ body={'instanceId': str(uuid.uuid1())})
+ .execute())
+
+
+class DataprocWorkflowTemplateInstantiateInlineOperator(
+ DataprocWorkflowTemplateBaseOperator):
+ """
+ Instantiate a WorkflowTemplate Inline on Google Cloud Dataproc. The operator will
+ wait until the WorkflowTemplate is finished executing.
+
+ Please refer to:
+ https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiateInline
+
+ :param template: The template contents.
+ :type template: map
+ :param project_id: The ID of the google cloud project in which
+ the template runs
+ :type project_id: string
+ :param region: leave as 'global', might become relevant in the future
+ :type region: string
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: string
+ """
+
+ @apply_defaults
+ def __init__(self, template, *args, **kwargs):
+ (super(DataprocWorkflowTemplateInstantiateInlineOperator, self)
+ .__init__(*args, **kwargs))
+ self.template = template
+
+ def start(self):
+ self.log.info('Instantiating Inline Template')
+ return (
+ self.hook.get_conn().projects().regions().workflowTemplates()
+ .instantiateInline(
+ parent='projects/%s/regions/%s' % (self.project_id, self.region),
+ instanceId=str(uuid.uuid1()),
+ body=self.template)
+ .execute())
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0990ba8c/tests/contrib/operators/test_dataproc_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py
index 2df056a..448da86 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -18,12 +18,15 @@ import re
import unittest
from airflow import DAG
-from airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator
-from airflow.contrib.operators.dataproc_operator import DataprocClusterDeleteOperator
-from airflow.contrib.operators.dataproc_operator import DataProcHadoopOperator
-from airflow.contrib.operators.dataproc_operator import DataProcHiveOperator
-from airflow.contrib.operators.dataproc_operator import DataProcPySparkOperator
-from airflow.contrib.operators.dataproc_operator import DataProcSparkOperator
+from airflow.contrib.operators.dataproc_operator import \
+ DataprocClusterCreateOperator,\
+ DataprocClusterDeleteOperator,\
+ DataProcHadoopOperator,\
+ DataProcHiveOperator,\
+ DataProcPySparkOperator,\
+ DataProcSparkOperator,\
+ DataprocWorkflowTemplateInstantiateInlineOperator,\
+ DataprocWorkflowTemplateInstantiateOperator
from airflow.version import version
from copy import deepcopy
@@ -55,7 +58,7 @@ WORKER_MACHINE_TYPE = 'n1-standard-2'
WORKER_DISK_SIZE = 100
NUM_PREEMPTIBLE_WORKERS = 2
LABEL1 = {}
-LABEL2 = {'application':'test', 'year': 2017}
+LABEL2 = {'application': 'test', 'year': 2017}
SERVICE_ACCOUNT_SCOPES = [
'https://www.googleapis.com/auth/bigquery',
'https://www.googleapis.com/auth/bigtable.data'
@@ -63,6 +66,10 @@ SERVICE_ACCOUNT_SCOPES = [
DEFAULT_DATE = datetime.datetime(2017, 6, 6)
REGION = 'test-region'
MAIN_URI = 'test-uri'
+TEMPLATE_ID = 'template-id'
+
+HOOK = 'airflow.contrib.operators.dataproc_operator.DataProcHook'
+
class DataprocClusterCreateOperatorTest(unittest.TestCase):
# Unit test for the DataprocClusterCreateOperator
@@ -290,3 +297,116 @@ class DataProcSparkOperatorTest(unittest.TestCase):
dataproc_task.execute(None)
mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY, REGION)
+
+
+class DataprocWorkflowTemplateInstantiateOperatorTest(unittest.TestCase):
+ def setUp(self):
+ # Setup service.projects().regions().workflowTemplates().instantiate().execute()
+ self.operation = {'name': 'operation', 'done': True}
+ self.mock_execute = Mock()
+ self.mock_execute.execute.return_value = self.operation
+ self.mock_workflows = Mock()
+ self.mock_workflows.instantiate.return_value = self.mock_execute
+ self.mock_regions = Mock()
+ self.mock_regions.workflowTemplates.return_value = self.mock_workflows
+ self.mock_projects = Mock()
+ self.mock_projects.regions.return_value = self.mock_regions
+ self.mock_conn = Mock()
+ self.mock_conn.projects.return_value = self.mock_projects
+ self.dag = DAG(
+ 'test_dag',
+ default_args={
+ 'owner': 'airflow',
+ 'start_date': DEFAULT_DATE,
+ 'end_date': DEFAULT_DATE,
+ },
+ schedule_interval='@daily')
+
+ def test_workflow(self):
+ with patch(HOOK) as MockHook:
+ hook = MockHook()
+ hook.get_conn.return_value = self.mock_conn
+ hook.await.return_value = None
+
+ dataproc_task = DataprocWorkflowTemplateInstantiateOperator(
+ task_id=TASK_ID,
+ project_id=PROJECT_ID,
+ region=REGION,
+ template_id=TEMPLATE_ID,
+ dag=self.dag
+ )
+
+ dataproc_task.execute(None)
+ template_name = (
+ 'projects/test-project-id/regions/test-region/'
+ 'workflowTemplates/template-id')
+ self.mock_workflows.instantiate.assert_called_once_with(
+ name=template_name,
+ body=mock.ANY)
+ hook.await.assert_called_once_with(self.operation)
+
+
+class DataprocWorkflowTemplateInstantiateInlineOperatorTest(unittest.TestCase):
+ def setUp(self):
+ # Setup service.projects().regions().workflowTemplates().instantiateInline()
+ # .execute()
+ self.operation = {'name': 'operation', 'done': True}
+ self.mock_execute = Mock()
+ self.mock_execute.execute.return_value = self.operation
+ self.mock_workflows = Mock()
+ self.mock_workflows.instantiateInline.return_value = self.mock_execute
+ self.mock_regions = Mock()
+ self.mock_regions.workflowTemplates.return_value = self.mock_workflows
+ self.mock_projects = Mock()
+ self.mock_projects.regions.return_value = self.mock_regions
+ self.mock_conn = Mock()
+ self.mock_conn.projects.return_value = self.mock_projects
+ self.dag = DAG(
+ 'test_dag',
+ default_args={
+ 'owner': 'airflow',
+ 'start_date': DEFAULT_DATE,
+ 'end_date': DEFAULT_DATE,
+ },
+ schedule_interval='@daily')
+
+ def test_iniline_workflow(self):
+ with patch(HOOK) as MockHook:
+ hook = MockHook()
+ hook.get_conn.return_value = self.mock_conn
+ hook.await.return_value = None
+
+ template = {
+ "placement": {
+ "managed_cluster": {
+ "cluster_name": CLUSTER_NAME,
+ "config": {
+ "gce_cluster_config": {
+ "zone_uri": ZONE,
+ }
+ }
+ }
+ },
+ "jobs": [
+ {
+ "step_id": "say-hello",
+ "pig_job": {
+ "query": "sh echo hello"
+ }
+ }],
+ }
+
+ dataproc_task = DataprocWorkflowTemplateInstantiateInlineOperator(
+ task_id=TASK_ID,
+ project_id=PROJECT_ID,
+ region=REGION,
+ template=template,
+ dag=self.dag
+ )
+
+ dataproc_task.execute(None)
+ self.mock_workflows.instantiateInline.assert_called_once_with(
+ parent='projects/test-project-id/regions/test-region',
+ instanceId=mock.ANY,
+ body=template)
+ hook.await.assert_called_once_with(self.operation)