You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2018/05/14 15:38:40 UTC

incubator-airflow git commit: [AIRFLOW-2461] Add support for cluster scaling on dataproc operator

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 92363490b -> 9c915c1c8


[AIRFLOW-2461] Add support for cluster scaling on dataproc operator

Closes #3357 from piffall/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9c915c1c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9c915c1c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9c915c1c

Branch: refs/heads/master
Commit: 9c915c1c8b08de5cb1925bdd3454649bf3b46bcb
Parents: 9236349
Author: Cristòfol Torrens <to...@bluekiri.com>
Authored: Mon May 14 16:38:28 2018 +0100
Committer: Kaxil Naik <ka...@gmail.com>
Committed: Mon May 14 16:38:28 2018 +0100

----------------------------------------------------------------------
 airflow/contrib/operators/dataproc_operator.py  | 148 +++++++++++++++++++
 docs/code.rst                                   |   1 +
 docs/integration.rst                            |   8 +
 .../contrib/operators/test_dataproc_operator.py | 113 +++++++++++---
 4 files changed, 252 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9c915c1c/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 6b1dbee..23fae9a 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -377,6 +377,154 @@ class DataprocClusterCreateOperator(BaseOperator):
         self._wait_for_done(service)
 
 
+class DataprocClusterScaleOperator(BaseOperator):
+    """
+    Scale, up or down, a cluster on Google Cloud Dataproc.
+    The operator will wait until the cluster is re-scaled.
+
+    **Example**: ::
+
+    t1 = DataprocClusterScaleOperator(
+            task_id='dataproc_scale',
+            project_id='my-project',
+            cluster_name='cluster-1',
+            num_workers=10,
+            num_preemptible_workers=10,
+            graceful_decommission_timeout='1h'
+            dag=dag)
+
+    .. seealso::
+        For more detail on about scaling clusters have a look at the reference:
+        https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/scaling-clusters
+
+    :param cluster_name: The name of the cluster to scale.
+    :type cluster_name: string
+    :param project_id: The ID of the google cloud project in which
+        the cluster runs
+    :type project_id: string
+    :param region: The region for the dataproc cluster
+    :type region: string
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+    :type gcp_conn_id: string
+    :param num_workers: The new number of workers
+    :type num_workers: int
+    :param num_preemptible_workers: The new number of preemptible workers
+    :type num_preemptible_workers: int
+    :param graceful_decommission_timeout: Timeout for graceful YARN decomissioning.
+        Maximum value is 1d
+    :type graceful_decommission_timeout: string
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: string
+    """
+
+    template_fields = ['cluster_name', 'project_id', 'region']
+
+    @apply_defaults
+    def __init__(self,
+                 cluster_name,
+                 project_id,
+                 region='global',
+                 gcp_conn_id='google_cloud_default',
+                 delegate_to=None,
+                 num_workers=2,
+                 num_preemptible_workers=0,
+                 graceful_decommission_timeout=None,
+                 *args,
+                 **kwargs):
+        super(DataprocClusterScaleOperator, self).__init__(*args, **kwargs)
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.cluster_name = cluster_name
+        self.project_id = project_id
+        self.region = region
+        self.num_workers = num_workers
+        self.num_preemptible_workers = num_preemptible_workers
+
+        # Optional
+        self.optional_arguments = {}
+        if graceful_decommission_timeout:
+            self.optional_arguments['gracefulDecommissionTimeout'] = \
+                self._get_graceful_decommission_timeout(
+                    graceful_decommission_timeout)
+
+    def _wait_for_done(self, service, operation_name):
+        time.sleep(15)
+        while True:
+            try:
+                response = service.projects().regions().operations().get(
+                    name=operation_name
+                ).execute()
+
+                if 'done' in response and response['done']:
+                    if 'error' in response:
+                        raise Exception(str(response['error']))
+                    else:
+                        return
+                time.sleep(15)
+            except HttpError as e:
+                self.log.error("Operation not found.")
+                raise e
+
+    def _build_scale_cluster_data(self):
+        scale_data = {
+            'config': {
+                'workerConfig': {
+                    'numInstances': self.num_workers
+                },
+                'secondaryWorkerConfig': {
+                    'numInstances': self.num_preemptible_workers
+                }
+            }
+        }
+        return scale_data
+
+    def _get_graceful_decommission_timeout(self, timeout):
+        match = re.match(r"^(\d+)(s|m|h|d)$", timeout)
+        if match:
+            if match.group(2) == "s":
+                return timeout
+            elif match.group(2) == "m":
+                val = float(match.group(1))
+                return "{}s".format(timedelta(minutes=val).seconds)
+            elif match.group(2) == "h":
+                val = float(match.group(1))
+                return "{}s".format(timedelta(hours=val).seconds)
+            elif match.group(2) == "d":
+                val = float(match.group(1))
+                return "{}s".format(timedelta(days=val).seconds)
+
+        raise AirflowException(
+            "DataprocClusterScaleOperator "
+            " should be expressed in day, hours, minutes or seconds. "
+            " i.e. 1d, 4h, 10m, 30s")
+
+    def execute(self, context):
+        self.log.info("Scaling cluster: %s", self.cluster_name)
+        hook = DataProcHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to
+        )
+        service = hook.get_conn()
+
+        update_mask = "config.worker_config.num_instances," \
+                      + "config.secondary_worker_config.num_instances"
+        scaling_cluster_data = self._build_scale_cluster_data()
+
+        response = service.projects().regions().clusters().patch(
+            projectId=self.project_id,
+            region=self.region,
+            clusterName=self.cluster_name,
+            updateMask=update_mask,
+            body=scaling_cluster_data,
+            **self.optional_arguments
+        ).execute()
+        operation_name = response['name']
+        self.log.info("Cluster scale operation name: %s", operation_name)
+        self._wait_for_done(service, operation_name)
+
+
 class DataprocClusterDeleteOperator(BaseOperator):
     """
     Delete a cluster on Google Cloud Dataproc. The operator will wait until the

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9c915c1c/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index 618f376..67519c9 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -126,6 +126,7 @@ Operators
 .. autoclass:: airflow.contrib.operators.dataflow_operator.DataflowTemplateOperator
 .. autoclass:: airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator
 .. autoclass:: airflow.contrib.operators.dataproc_operator.DataprocClusterCreateOperator
+.. autoclass:: airflow.contrib.operators.dataproc_operator.DataprocClusterScaleOperator
 .. autoclass:: airflow.contrib.operators.dataproc_operator.DataprocClusterDeleteOperator
 .. autoclass:: airflow.contrib.operators.dataproc_operator.DataProcPigOperator
 .. autoclass:: airflow.contrib.operators.dataproc_operator.DataProcHiveOperator

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9c915c1c/docs/integration.rst
----------------------------------------------------------------------
diff --git a/docs/integration.rst b/docs/integration.rst
index e8a06e1..5e54eb1 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -571,6 +571,7 @@ DataProc Operators
 
 - :ref:`DataprocClusterCreateOperator` : Create a new cluster on Google Cloud Dataproc.
 - :ref:`DataprocClusterDeleteOperator` : Delete a cluster on Google Cloud Dataproc.
+- :ref:`DataprocClusterScaleOperator` : Scale up or down a cluster on Google Cloud Dataproc.
 - :ref:`DataProcPigOperator` : Start a Pig query Job on a Cloud DataProc cluster.
 - :ref:`DataProcHiveOperator` : Start a Hive query Job on a Cloud DataProc cluster.
 - :ref:`DataProcSparkSqlOperator` : Start a Spark SQL query Job on a Cloud DataProc cluster.
@@ -587,6 +588,13 @@ DataprocClusterCreateOperator
 
 .. autoclass:: airflow.contrib.operators.dataproc_operator.DataprocClusterCreateOperator
 
+.. _DataprocClusterScaleOperator:
+
+DataprocClusterScaleOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: airflow.contrib.operators.dataproc_operator.DataprocClusterScaleOperator
+
 .. _DataprocClusterDeleteOperator:
 
 DataprocClusterDeleteOperator

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9c915c1c/tests/contrib/operators/test_dataproc_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py
index d039cf1..6cb2044 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -24,14 +24,15 @@ import unittest
 
 from airflow import DAG
 from airflow.contrib.operators.dataproc_operator import \
-    DataprocClusterCreateOperator,\
-    DataprocClusterDeleteOperator,\
-    DataProcHadoopOperator,\
-    DataProcHiveOperator,\
-    DataProcPySparkOperator,\
-    DataProcSparkOperator,\
-    DataprocWorkflowTemplateInstantiateInlineOperator,\
-    DataprocWorkflowTemplateInstantiateOperator
+    DataprocClusterCreateOperator, \
+    DataprocClusterDeleteOperator, \
+    DataProcHadoopOperator, \
+    DataProcHiveOperator, \
+    DataProcPySparkOperator, \
+    DataProcSparkOperator, \
+    DataprocWorkflowTemplateInstantiateInlineOperator, \
+    DataprocWorkflowTemplateInstantiateOperator, \
+    DataprocClusterScaleOperator
 from airflow.version import version
 
 from copy import deepcopy
@@ -229,7 +230,8 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
                          "2017-06-07T00:00:00.000000Z")
 
     def test_cluster_name_log_no_sub(self):
-        with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') as mock_hook:
+        with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') \
+                as mock_hook:
             mock_hook.return_value.get_conn = self.mock_conn
             dataproc_task = DataprocClusterCreateOperator(
                 task_id=TASK_ID,
@@ -245,7 +247,8 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
                 mock_info.assert_called_with('Creating cluster: %s', CLUSTER_NAME)
 
     def test_cluster_name_log_sub(self):
-        with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') as mock_hook:
+        with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') \
+                as mock_hook:
             mock_hook.return_value.get_conn = self.mock_conn
             dataproc_task = DataprocClusterCreateOperator(
                 task_id=TASK_ID,
@@ -256,13 +259,83 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
                 dag=self.dag
             )
             with patch.object(dataproc_task.log, 'info') as mock_info:
-                context = { 'ts_nodash' : 'testnodash'}
+                context = {'ts_nodash': 'testnodash'}
 
-                rendered = dataproc_task.render_template('cluster_name', getattr(dataproc_task,'cluster_name'), context)
+                rendered = dataproc_task.render_template(
+                    'cluster_name',
+                    getattr(dataproc_task, 'cluster_name'), context)
                 setattr(dataproc_task, 'cluster_name', rendered)
-                with self.assertRaises(TypeError) as _:
+                with self.assertRaises(TypeError):
+                    dataproc_task.execute(None)
+                mock_info.assert_called_with('Creating cluster: %s',
+                                             u'smoke-cluster-testnodash')
+
+
+class DataprocClusterScaleOperatorTest(unittest.TestCase):
+    # Unit test for the DataprocClusterScaleOperator
+    def setUp(self):
+        self.mock_execute = Mock()
+        self.mock_execute.execute = Mock(return_value={'done': True})
+        self.mock_get = Mock()
+        self.mock_get.get = Mock(return_value=self.mock_execute)
+        self.mock_operations = Mock()
+        self.mock_operations.get = Mock(return_value=self.mock_get)
+        self.mock_regions = Mock()
+        self.mock_regions.operations = Mock(return_value=self.mock_operations)
+        self.mock_projects = Mock()
+        self.mock_projects.regions = Mock(return_value=self.mock_regions)
+        self.mock_conn = Mock()
+        self.mock_conn.projects = Mock(return_value=self.mock_projects)
+        self.dag = DAG(
+            'test_dag',
+            default_args={
+                'owner': 'airflow',
+                'start_date': DEFAULT_DATE,
+                'end_date': DEFAULT_DATE,
+            },
+            schedule_interval='@daily')
+
+    def test_cluster_name_log_no_sub(self):
+        with patch('airflow.contrib.hooks.gcp_dataproc_hook.DataProcHook') as mock_hook:
+            mock_hook.return_value.get_conn = self.mock_conn
+            dataproc_task = DataprocClusterScaleOperator(
+                task_id=TASK_ID,
+                cluster_name=CLUSTER_NAME,
+                project_id=PROJECT_ID,
+                num_workers=NUM_WORKERS,
+                num_preemptible_workers=NUM_PREEMPTIBLE_WORKERS,
+                dag=self.dag
+            )
+            with patch.object(dataproc_task.log, 'info') as mock_info:
+                with self.assertRaises(TypeError):
                     dataproc_task.execute(None)
-                mock_info.assert_called_with('Creating cluster: %s', u'smoke-cluster-testnodash')
+                mock_info.assert_called_with('Scaling cluster: %s', CLUSTER_NAME)
+
+    def test_cluster_name_log_sub(self):
+        with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') \
+                as mock_hook:
+            mock_hook.return_value.get_conn = self.mock_conn
+            dataproc_task = DataprocClusterScaleOperator(
+                task_id=TASK_ID,
+                cluster_name='smoke-cluster-{{ ts_nodash }}',
+                project_id=PROJECT_ID,
+                num_workers=NUM_WORKERS,
+                num_preemptible_workers=NUM_PREEMPTIBLE_WORKERS,
+                dag=self.dag
+            )
+
+            with patch.object(dataproc_task.log, 'info') as mock_info:
+                context = {'ts_nodash': 'testnodash'}
+
+                rendered = dataproc_task.render_template(
+                    'cluster_name',
+                    getattr(dataproc_task, 'cluster_name'), context)
+                setattr(dataproc_task, 'cluster_name', rendered)
+                with self.assertRaises(TypeError):
+                    dataproc_task.execute(None)
+                mock_info.assert_called_with('Scaling cluster: %s',
+                                             u'smoke-cluster-testnodash')
+
 
 class DataprocClusterDeleteOperatorTest(unittest.TestCase):
     # Unit test for the DataprocClusterDeleteOperator
@@ -313,13 +386,17 @@ class DataprocClusterDeleteOperatorTest(unittest.TestCase):
             )
 
             with patch.object(dataproc_task.log, 'info') as mock_info:
-                context = { 'ts_nodash' : 'testnodash'}
+                context = {'ts_nodash': 'testnodash'}
 
-                rendered = dataproc_task.render_template('cluster_name', getattr(dataproc_task,'cluster_name'), context)
+                rendered = dataproc_task.render_template(
+                    'cluster_name',
+                    getattr(dataproc_task, 'cluster_name'), context)
                 setattr(dataproc_task, 'cluster_name', rendered)
-                with self.assertRaises(TypeError) as _:
+                with self.assertRaises(TypeError):
                     dataproc_task.execute(None)
-                mock_info.assert_called_with('Deleting cluster: %s', u'smoke-cluster-testnodash')
+                mock_info.assert_called_with('Deleting cluster: %s',
+                                             u'smoke-cluster-testnodash')
+
 
 class DataProcHadoopOperatorTest(unittest.TestCase):
     # Unit test for the DataProcHadoopOperator