You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2018/05/14 15:38:40 UTC
incubator-airflow git commit: [AIRFLOW-2461] Add support for cluster
scaling on dataproc operator
Repository: incubator-airflow
Updated Branches:
refs/heads/master 92363490b -> 9c915c1c8
[AIRFLOW-2461] Add support for cluster scaling on dataproc operator
Closes #3357 from piffall/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9c915c1c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9c915c1c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9c915c1c
Branch: refs/heads/master
Commit: 9c915c1c8b08de5cb1925bdd3454649bf3b46bcb
Parents: 9236349
Author: Cristòfol Torrens <to...@bluekiri.com>
Authored: Mon May 14 16:38:28 2018 +0100
Committer: Kaxil Naik <ka...@gmail.com>
Committed: Mon May 14 16:38:28 2018 +0100
----------------------------------------------------------------------
airflow/contrib/operators/dataproc_operator.py | 148 +++++++++++++++++++
docs/code.rst | 1 +
docs/integration.rst | 8 +
.../contrib/operators/test_dataproc_operator.py | 113 +++++++++++---
4 files changed, 252 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9c915c1c/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 6b1dbee..23fae9a 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -377,6 +377,154 @@ class DataprocClusterCreateOperator(BaseOperator):
self._wait_for_done(service)
+class DataprocClusterScaleOperator(BaseOperator):
+ """
+ Scale, up or down, a cluster on Google Cloud Dataproc.
+ The operator will wait until the cluster is re-scaled.
+
+ **Example**: ::
+
+ t1 = DataprocClusterScaleOperator(
+ task_id='dataproc_scale',
+ project_id='my-project',
+ cluster_name='cluster-1',
+ num_workers=10,
+ num_preemptible_workers=10,
+ graceful_decommission_timeout='1h'
+ dag=dag)
+
+ .. seealso::
+ For more detail on about scaling clusters have a look at the reference:
+ https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/scaling-clusters
+
+ :param cluster_name: The name of the cluster to scale.
+ :type cluster_name: string
+ :param project_id: The ID of the google cloud project in which
+ the cluster runs
+ :type project_id: string
+ :param region: The region for the dataproc cluster
+ :type region: string
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: string
+ :param num_workers: The new number of workers
+ :type num_workers: int
+ :param num_preemptible_workers: The new number of preemptible workers
+ :type num_preemptible_workers: int
+ :param graceful_decommission_timeout: Timeout for graceful YARN decomissioning.
+ Maximum value is 1d
+ :type graceful_decommission_timeout: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: string
+ """
+
+ template_fields = ['cluster_name', 'project_id', 'region']
+
+ @apply_defaults
+ def __init__(self,
+ cluster_name,
+ project_id,
+ region='global',
+ gcp_conn_id='google_cloud_default',
+ delegate_to=None,
+ num_workers=2,
+ num_preemptible_workers=0,
+ graceful_decommission_timeout=None,
+ *args,
+ **kwargs):
+ super(DataprocClusterScaleOperator, self).__init__(*args, **kwargs)
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.cluster_name = cluster_name
+ self.project_id = project_id
+ self.region = region
+ self.num_workers = num_workers
+ self.num_preemptible_workers = num_preemptible_workers
+
+ # Optional
+ self.optional_arguments = {}
+ if graceful_decommission_timeout:
+ self.optional_arguments['gracefulDecommissionTimeout'] = \
+ self._get_graceful_decommission_timeout(
+ graceful_decommission_timeout)
+
+ def _wait_for_done(self, service, operation_name):
+ time.sleep(15)
+ while True:
+ try:
+ response = service.projects().regions().operations().get(
+ name=operation_name
+ ).execute()
+
+ if 'done' in response and response['done']:
+ if 'error' in response:
+ raise Exception(str(response['error']))
+ else:
+ return
+ time.sleep(15)
+ except HttpError as e:
+ self.log.error("Operation not found.")
+ raise e
+
+ def _build_scale_cluster_data(self):
+ scale_data = {
+ 'config': {
+ 'workerConfig': {
+ 'numInstances': self.num_workers
+ },
+ 'secondaryWorkerConfig': {
+ 'numInstances': self.num_preemptible_workers
+ }
+ }
+ }
+ return scale_data
+
+ def _get_graceful_decommission_timeout(self, timeout):
+ match = re.match(r"^(\d+)(s|m|h|d)$", timeout)
+ if match:
+ if match.group(2) == "s":
+ return timeout
+ elif match.group(2) == "m":
+ val = float(match.group(1))
+ return "{}s".format(timedelta(minutes=val).seconds)
+ elif match.group(2) == "h":
+ val = float(match.group(1))
+ return "{}s".format(timedelta(hours=val).seconds)
+ elif match.group(2) == "d":
+ val = float(match.group(1))
+ return "{}s".format(timedelta(days=val).seconds)
+
+ raise AirflowException(
+ "DataprocClusterScaleOperator "
+ " should be expressed in day, hours, minutes or seconds. "
+ " i.e. 1d, 4h, 10m, 30s")
+
+ def execute(self, context):
+ self.log.info("Scaling cluster: %s", self.cluster_name)
+ hook = DataProcHook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to
+ )
+ service = hook.get_conn()
+
+ update_mask = "config.worker_config.num_instances," \
+ + "config.secondary_worker_config.num_instances"
+ scaling_cluster_data = self._build_scale_cluster_data()
+
+ response = service.projects().regions().clusters().patch(
+ projectId=self.project_id,
+ region=self.region,
+ clusterName=self.cluster_name,
+ updateMask=update_mask,
+ body=scaling_cluster_data,
+ **self.optional_arguments
+ ).execute()
+ operation_name = response['name']
+ self.log.info("Cluster scale operation name: %s", operation_name)
+ self._wait_for_done(service, operation_name)
+
+
class DataprocClusterDeleteOperator(BaseOperator):
"""
Delete a cluster on Google Cloud Dataproc. The operator will wait until the
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9c915c1c/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index 618f376..67519c9 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -126,6 +126,7 @@ Operators
.. autoclass:: airflow.contrib.operators.dataflow_operator.DataflowTemplateOperator
.. autoclass:: airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator
.. autoclass:: airflow.contrib.operators.dataproc_operator.DataprocClusterCreateOperator
+.. autoclass:: airflow.contrib.operators.dataproc_operator.DataprocClusterScaleOperator
.. autoclass:: airflow.contrib.operators.dataproc_operator.DataprocClusterDeleteOperator
.. autoclass:: airflow.contrib.operators.dataproc_operator.DataProcPigOperator
.. autoclass:: airflow.contrib.operators.dataproc_operator.DataProcHiveOperator
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9c915c1c/docs/integration.rst
----------------------------------------------------------------------
diff --git a/docs/integration.rst b/docs/integration.rst
index e8a06e1..5e54eb1 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -571,6 +571,7 @@ DataProc Operators
- :ref:`DataprocClusterCreateOperator` : Create a new cluster on Google Cloud Dataproc.
- :ref:`DataprocClusterDeleteOperator` : Delete a cluster on Google Cloud Dataproc.
+- :ref:`DataprocClusterScaleOperator` : Scale up or down a cluster on Google Cloud Dataproc.
- :ref:`DataProcPigOperator` : Start a Pig query Job on a Cloud DataProc cluster.
- :ref:`DataProcHiveOperator` : Start a Hive query Job on a Cloud DataProc cluster.
- :ref:`DataProcSparkSqlOperator` : Start a Spark SQL query Job on a Cloud DataProc cluster.
@@ -587,6 +588,13 @@ DataprocClusterCreateOperator
.. autoclass:: airflow.contrib.operators.dataproc_operator.DataprocClusterCreateOperator
+.. _DataprocClusterScaleOperator:
+
+DataprocClusterScaleOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: airflow.contrib.operators.dataproc_operator.DataprocClusterScaleOperator
+
.. _DataprocClusterDeleteOperator:
DataprocClusterDeleteOperator
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9c915c1c/tests/contrib/operators/test_dataproc_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py
index d039cf1..6cb2044 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -24,14 +24,15 @@ import unittest
from airflow import DAG
from airflow.contrib.operators.dataproc_operator import \
- DataprocClusterCreateOperator,\
- DataprocClusterDeleteOperator,\
- DataProcHadoopOperator,\
- DataProcHiveOperator,\
- DataProcPySparkOperator,\
- DataProcSparkOperator,\
- DataprocWorkflowTemplateInstantiateInlineOperator,\
- DataprocWorkflowTemplateInstantiateOperator
+ DataprocClusterCreateOperator, \
+ DataprocClusterDeleteOperator, \
+ DataProcHadoopOperator, \
+ DataProcHiveOperator, \
+ DataProcPySparkOperator, \
+ DataProcSparkOperator, \
+ DataprocWorkflowTemplateInstantiateInlineOperator, \
+ DataprocWorkflowTemplateInstantiateOperator, \
+ DataprocClusterScaleOperator
from airflow.version import version
from copy import deepcopy
@@ -229,7 +230,8 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
"2017-06-07T00:00:00.000000Z")
def test_cluster_name_log_no_sub(self):
- with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') as mock_hook:
+ with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') \
+ as mock_hook:
mock_hook.return_value.get_conn = self.mock_conn
dataproc_task = DataprocClusterCreateOperator(
task_id=TASK_ID,
@@ -245,7 +247,8 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
mock_info.assert_called_with('Creating cluster: %s', CLUSTER_NAME)
def test_cluster_name_log_sub(self):
- with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') as mock_hook:
+ with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') \
+ as mock_hook:
mock_hook.return_value.get_conn = self.mock_conn
dataproc_task = DataprocClusterCreateOperator(
task_id=TASK_ID,
@@ -256,13 +259,83 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
dag=self.dag
)
with patch.object(dataproc_task.log, 'info') as mock_info:
- context = { 'ts_nodash' : 'testnodash'}
+ context = {'ts_nodash': 'testnodash'}
- rendered = dataproc_task.render_template('cluster_name', getattr(dataproc_task,'cluster_name'), context)
+ rendered = dataproc_task.render_template(
+ 'cluster_name',
+ getattr(dataproc_task, 'cluster_name'), context)
setattr(dataproc_task, 'cluster_name', rendered)
- with self.assertRaises(TypeError) as _:
+ with self.assertRaises(TypeError):
+ dataproc_task.execute(None)
+ mock_info.assert_called_with('Creating cluster: %s',
+ u'smoke-cluster-testnodash')
+
+
+class DataprocClusterScaleOperatorTest(unittest.TestCase):
+ # Unit test for the DataprocClusterScaleOperator
+ def setUp(self):
+ self.mock_execute = Mock()
+ self.mock_execute.execute = Mock(return_value={'done': True})
+ self.mock_get = Mock()
+ self.mock_get.get = Mock(return_value=self.mock_execute)
+ self.mock_operations = Mock()
+ self.mock_operations.get = Mock(return_value=self.mock_get)
+ self.mock_regions = Mock()
+ self.mock_regions.operations = Mock(return_value=self.mock_operations)
+ self.mock_projects = Mock()
+ self.mock_projects.regions = Mock(return_value=self.mock_regions)
+ self.mock_conn = Mock()
+ self.mock_conn.projects = Mock(return_value=self.mock_projects)
+ self.dag = DAG(
+ 'test_dag',
+ default_args={
+ 'owner': 'airflow',
+ 'start_date': DEFAULT_DATE,
+ 'end_date': DEFAULT_DATE,
+ },
+ schedule_interval='@daily')
+
+ def test_cluster_name_log_no_sub(self):
+ with patch('airflow.contrib.hooks.gcp_dataproc_hook.DataProcHook') as mock_hook:
+ mock_hook.return_value.get_conn = self.mock_conn
+ dataproc_task = DataprocClusterScaleOperator(
+ task_id=TASK_ID,
+ cluster_name=CLUSTER_NAME,
+ project_id=PROJECT_ID,
+ num_workers=NUM_WORKERS,
+ num_preemptible_workers=NUM_PREEMPTIBLE_WORKERS,
+ dag=self.dag
+ )
+ with patch.object(dataproc_task.log, 'info') as mock_info:
+ with self.assertRaises(TypeError):
dataproc_task.execute(None)
- mock_info.assert_called_with('Creating cluster: %s', u'smoke-cluster-testnodash')
+ mock_info.assert_called_with('Scaling cluster: %s', CLUSTER_NAME)
+
+ def test_cluster_name_log_sub(self):
+ with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') \
+ as mock_hook:
+ mock_hook.return_value.get_conn = self.mock_conn
+ dataproc_task = DataprocClusterScaleOperator(
+ task_id=TASK_ID,
+ cluster_name='smoke-cluster-{{ ts_nodash }}',
+ project_id=PROJECT_ID,
+ num_workers=NUM_WORKERS,
+ num_preemptible_workers=NUM_PREEMPTIBLE_WORKERS,
+ dag=self.dag
+ )
+
+ with patch.object(dataproc_task.log, 'info') as mock_info:
+ context = {'ts_nodash': 'testnodash'}
+
+ rendered = dataproc_task.render_template(
+ 'cluster_name',
+ getattr(dataproc_task, 'cluster_name'), context)
+ setattr(dataproc_task, 'cluster_name', rendered)
+ with self.assertRaises(TypeError):
+ dataproc_task.execute(None)
+ mock_info.assert_called_with('Scaling cluster: %s',
+ u'smoke-cluster-testnodash')
+
class DataprocClusterDeleteOperatorTest(unittest.TestCase):
# Unit test for the DataprocClusterDeleteOperator
@@ -313,13 +386,17 @@ class DataprocClusterDeleteOperatorTest(unittest.TestCase):
)
with patch.object(dataproc_task.log, 'info') as mock_info:
- context = { 'ts_nodash' : 'testnodash'}
+ context = {'ts_nodash': 'testnodash'}
- rendered = dataproc_task.render_template('cluster_name', getattr(dataproc_task,'cluster_name'), context)
+ rendered = dataproc_task.render_template(
+ 'cluster_name',
+ getattr(dataproc_task, 'cluster_name'), context)
setattr(dataproc_task, 'cluster_name', rendered)
- with self.assertRaises(TypeError) as _:
+ with self.assertRaises(TypeError):
dataproc_task.execute(None)
- mock_info.assert_called_with('Deleting cluster: %s', u'smoke-cluster-testnodash')
+ mock_info.assert_called_with('Deleting cluster: %s',
+ u'smoke-cluster-testnodash')
+
class DataProcHadoopOperatorTest(unittest.TestCase):
# Unit test for the DataProcHadoopOperator