You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by tu...@apache.org on 2020/08/16 03:53:07 UTC

[airflow] branch master updated: Add Bigtable Update Instance Hook/Operator (#10340)

This is an automated email from the ASF dual-hosted git repository.

turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 382c101  Add Bigtable Update Instance Hook/Operator (#10340)
382c101 is described below

commit 382c1011b6bcebd22760e2f98419281ef1a09d1b
Author: Ryan Yuan <ry...@outlook.com>
AuthorDate: Sun Aug 16 13:52:14 2020 +1000

    Add Bigtable Update Instance Hook/Operator (#10340)
    
    Add Bigtable Update Instance Hook/Operator
---
 .../google/cloud/example_dags/example_bigtable.py  |  18 ++-
 airflow/providers/google/cloud/hooks/bigtable.py   |  46 +++++++
 .../providers/google/cloud/operators/bigtable.py   |  79 +++++++++++-
 docs/howto/operator/google/cloud/bigtable.rst      |  23 ++++
 .../providers/google/cloud/hooks/test_bigtable.py  |  43 +++++++
 .../google/cloud/operators/test_bigtable.py        | 141 ++++++++++++++++++++-
 6 files changed, 347 insertions(+), 3 deletions(-)

diff --git a/airflow/providers/google/cloud/example_dags/example_bigtable.py b/airflow/providers/google/cloud/example_dags/example_bigtable.py
index b51b6ff..f54c6de 100644
--- a/airflow/providers/google/cloud/example_dags/example_bigtable.py
+++ b/airflow/providers/google/cloud/example_dags/example_bigtable.py
@@ -51,7 +51,7 @@ from os import getenv
 from airflow import models
 from airflow.providers.google.cloud.operators.bigtable import (
     BigtableCreateInstanceOperator, BigtableCreateTableOperator, BigtableDeleteInstanceOperator,
-    BigtableDeleteTableOperator, BigtableUpdateClusterOperator,
+    BigtableDeleteTableOperator, BigtableUpdateClusterOperator, BigtableUpdateInstanceOperator,
 )
 from airflow.providers.google.cloud.sensors.bigtable import BigtableTableReplicationCompletedSensor
 from airflow.utils.dates import days_ago
@@ -59,8 +59,13 @@ from airflow.utils.dates import days_ago
 GCP_PROJECT_ID = getenv('GCP_PROJECT_ID', 'example-project')
 CBT_INSTANCE_ID = getenv('CBT_INSTANCE_ID', 'some-instance-id')
 CBT_INSTANCE_DISPLAY_NAME = getenv('CBT_INSTANCE_DISPLAY_NAME', 'Human-readable name')
+CBT_INSTANCE_DISPLAY_NAME_UPDATED = getenv(
+    "CBT_INSTANCE_DISPLAY_NAME_UPDATED", "Human-readable name - updated"
+)
 CBT_INSTANCE_TYPE = getenv('CBT_INSTANCE_TYPE', '2')
+CBT_INSTANCE_TYPE_PROD = getenv('CBT_INSTANCE_TYPE_PROD', '1')
 CBT_INSTANCE_LABELS = getenv('CBT_INSTANCE_LABELS', '{}')
+CBT_INSTANCE_LABELS_UPDATED = getenv('CBT_INSTANCE_LABELS', '{"env": "prod"}')
 CBT_CLUSTER_ID = getenv('CBT_CLUSTER_ID', 'some-cluster-id')
 CBT_CLUSTER_ZONE = getenv('CBT_CLUSTER_ZONE', 'europe-west1-b')
 CBT_CLUSTER_NODES = getenv('CBT_CLUSTER_NODES', '3')
@@ -103,6 +108,16 @@ with models.DAG(
     create_instance_task >> create_instance_task2
     # [END howto_operator_gcp_bigtable_instance_create]
 
+    # [START howto_operator_gcp_bigtable_instance_update]
+    update_instance_task = BigtableUpdateInstanceOperator(
+        instance_id=CBT_INSTANCE_ID,
+        instance_display_name=CBT_INSTANCE_DISPLAY_NAME_UPDATED,
+        instance_type=int(CBT_INSTANCE_TYPE_PROD),
+        instance_labels=json.loads(CBT_INSTANCE_LABELS_UPDATED),
+        task_id='update_instance_task',
+    )
+    # [END howto_operator_gcp_bigtable_instance_update]
+
     # [START howto_operator_gcp_bigtable_cluster_update]
     cluster_update_task = BigtableUpdateClusterOperator(
         project_id=GCP_PROJECT_ID,
@@ -186,6 +201,7 @@ with models.DAG(
     create_instance_task \
         >> create_table_task \
         >> cluster_update_task \
+        >> update_instance_task \
         >> delete_table_task
     create_instance_task2 \
         >> create_table_task2 \
diff --git a/airflow/providers/google/cloud/hooks/bigtable.py b/airflow/providers/google/cloud/hooks/bigtable.py
index 0627fcc..1cd277d 100644
--- a/airflow/providers/google/cloud/hooks/bigtable.py
+++ b/airflow/providers/google/cloud/hooks/bigtable.py
@@ -18,6 +18,7 @@
 """
 This module contains a Google Cloud Bigtable Hook.
 """
+import enum
 from typing import Dict, List, Optional, Sequence, Union
 
 from google.cloud.bigtable import Client
@@ -184,6 +185,51 @@ class BigtableHook(GoogleBaseHook):
         operation.result(timeout)
         return instance
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def update_instance(
+        self,
+        instance_id: str,
+        project_id: str,
+        instance_display_name: Optional[str] = None,
+        instance_type: Optional[Union[enums.Instance.Type, enum.IntEnum]] = None,
+        instance_labels: Optional[Dict] = None,
+        timeout: Optional[float] = None
+    ) -> Instance:
+        """
+        Update an existing instance.
+
+        :type instance_id: str
+        :param instance_id: The ID for the existing instance.
+        :type project_id: str
+        :param project_id: Optional, Google Cloud Platform project ID where the
+            BigTable exists. If set to None or missing,
+            the default project_id from the GCP connection is used.
+        :type instance_display_name: str
+        :param instance_display_name: (optional) Human-readable name of the instance.
+        :type instance_type: enums.Instance.Type or enum.IntEnum
+        :param instance_type: (optional) The type of the instance.
+        :type instance_labels: dict
+        :param instance_labels: (optional) Dictionary of labels to associate with the
+            instance.
+        :type timeout: int
+        :param timeout: (optional) timeout (in seconds) for instance update.
+            If None is not specified, Operator will wait indefinitely.
+        """
+        instance_type = enums.Instance.Type(instance_type)
+
+        instance = Instance(
+            instance_id=instance_id,
+            client=self._get_client(project_id=project_id),
+            display_name=instance_display_name,
+            instance_type=instance_type,
+            labels=instance_labels,
+        )
+
+        operation = instance.update()
+        operation.result(timeout)
+
+        return instance
+
     @staticmethod
     def create_table(
         instance: Instance,
diff --git a/airflow/providers/google/cloud/operators/bigtable.py b/airflow/providers/google/cloud/operators/bigtable.py
index 7098fb1..b8bed03 100644
--- a/airflow/providers/google/cloud/operators/bigtable.py
+++ b/airflow/providers/google/cloud/operators/bigtable.py
@@ -18,7 +18,8 @@
 """
 This module contains Google Cloud Bigtable operators.
 """
-from typing import Dict, Iterable, List, Optional
+import enum
+from typing import Dict, Iterable, List, Optional, Union
 
 import google.api_core.exceptions
 from google.cloud.bigtable.column_family import GarbageCollectionRule
@@ -158,6 +159,82 @@ class BigtableCreateInstanceOperator(BaseOperator, BigtableValidationMixin):
             raise e
 
 
+class BigtableUpdateInstanceOperator(BaseOperator, BigtableValidationMixin):
+    """
+    Updates an existing Cloud Bigtable instance.
+
+    For more details about instance creation have a look at the reference:
+    https://googleapis.dev/python/bigtable/latest/instance.html#google.cloud.bigtable.instance.Instance.update
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BigtableUpdateInstanceOperator`
+
+    :type instance_id: str
+    :param instance_id: The ID of the Cloud Bigtable instance to update.
+    :type project_id: str
+    :param project_id: Optional, the ID of the GCP project. If set to None or missing,
+            the default project_id from the GCP connection is used.
+    :type instance_display_name: str
+    :param instance_display_name: (optional) Human-readable name of the instance.
+    :type instance_type: enums.Instance.Type or enum.IntEnum
+    :param instance_type: (optional) The type of the instance.
+    :type instance_labels: dict
+    :param instance_labels: (optional) Dictionary of labels to associate
+        with the instance.
+    :type timeout: int
+    :param timeout: (optional) timeout (in seconds) for instance update.
+                    If None is not specified, Operator will wait indefinitely.
+    :param gcp_conn_id: The connection ID to use to connect to Google Cloud Platform.
+    :type gcp_conn_id: str
+    """
+
+    REQUIRED_ATTRIBUTES: Iterable[str] = ['instance_id']
+    template_fields: Iterable[str] = ['project_id', 'instance_id']
+
+    @apply_defaults
+    def __init__(self, *,
+                 instance_id: str,
+                 project_id: Optional[str] = None,
+                 instance_display_name: Optional[str] = None,
+                 instance_type: Optional[Union[enums.Instance.Type, enum.IntEnum]] = None,
+                 instance_labels: Optional[Dict] = None,
+                 timeout: Optional[float] = None,
+                 gcp_conn_id: str = 'google_cloud_default',
+                 **kwargs) -> None:
+        self.project_id = project_id
+        self.instance_id = instance_id
+        self.instance_display_name = instance_display_name
+        self.instance_type = instance_type
+        self.instance_labels = instance_labels
+        self.timeout = timeout
+        self._validate_inputs()
+        self.gcp_conn_id = gcp_conn_id
+        super().__init__(**kwargs)
+
+    def execute(self, context):
+        hook = BigtableHook(gcp_conn_id=self.gcp_conn_id)
+        instance = hook.get_instance(project_id=self.project_id,
+                                     instance_id=self.instance_id)
+        if not instance:
+            raise AirflowException(
+                f"Dependency: instance '{self.instance_id}' does not exist."
+            )
+
+        try:
+            hook.update_instance(
+                project_id=self.project_id,
+                instance_id=self.instance_id,
+                instance_display_name=self.instance_display_name,
+                instance_type=self.instance_type,
+                instance_labels=self.instance_labels,
+                timeout=self.timeout,
+            )
+        except google.api_core.exceptions.GoogleAPICallError as e:
+            self.log.error('An error occurred. Exiting.')
+            raise e
+
+
 class BigtableDeleteInstanceOperator(BaseOperator, BigtableValidationMixin):
     """
     Deletes the Cloud Bigtable instance, including its clusters and all related tables.
diff --git a/docs/howto/operator/google/cloud/bigtable.rst b/docs/howto/operator/google/cloud/bigtable.rst
index befd2ea..b2f45bf 100644
--- a/docs/howto/operator/google/cloud/bigtable.rst
+++ b/docs/howto/operator/google/cloud/bigtable.rst
@@ -53,6 +53,29 @@ it will be retrieved from the GCP connection used. Both variants are shown:
     :start-after: [START howto_operator_gcp_bigtable_instance_create]
     :end-before: [END howto_operator_gcp_bigtable_instance_create]
 
+.. _howto/operator:BigtableUpdateInstanceOperator:
+
+BigtableUpdateInstanceOperator
+------------------------------
+
+Use the :class:`~airflow.providers.google.cloud.operators.bigtable.BigtableUpdateInstanceOperator`
+to update an existing Google Cloud Bigtable instance.
+
+Only the following configuration can be updated for an existing instance:
+instance_display_name, instance_type and instance_labels.
+
+Using the operator
+""""""""""""""""""
+
+You can create the operator with or without project id. If project id is missing
+it will be retrieved from the GCP connection used. Both variants are shown:
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_bigtable.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_gcp_bigtable_instance_update]
+    :end-before: [END howto_operator_gcp_bigtable_instance_update]
+
 .. _howto/operator:BigtableDeleteInstanceOperator:
 
 BigtableDeleteInstanceOperator
diff --git a/tests/providers/google/cloud/hooks/test_bigtable.py b/tests/providers/google/cloud/hooks/test_bigtable.py
index 6939069..0d32e5f 100644
--- a/tests/providers/google/cloud/hooks/test_bigtable.py
+++ b/tests/providers/google/cloud/hooks/test_bigtable.py
@@ -22,6 +22,7 @@ import google
 import mock
 from google.cloud.bigtable import Client
 from google.cloud.bigtable.instance import Instance
+from google.cloud.bigtable_admin_v2 import enums
 from mock import PropertyMock
 
 from airflow.providers.google.cloud.hooks.bigtable import BigtableHook
@@ -31,6 +32,9 @@ from tests.providers.google.cloud.utils.base_gcp_mock import (
 )
 
 CBT_INSTANCE = 'instance'
+CBT_INSTANCE_DISPLAY_NAME = "test instance"
+CBT_INSTANCE_TYPE = enums.Instance.Type.PRODUCTION
+CBT_INSTANCE_LABELS = {"env": "sit"}
 CBT_CLUSTER = 'cluster'
 CBT_ZONE = 'zone'
 CBT_TABLE = 'table'
@@ -102,6 +106,23 @@ class TestBigtableHookNoDefaultProjectId(unittest.TestCase):
         instance_create.assert_called_once_with(clusters=mock.ANY)
         self.assertEqual(res.instance_id, 'instance')
 
+    @mock.patch('google.cloud.bigtable.instance.Instance.update')
+    @mock.patch('airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client')
+    def test_update_instance_overridden_project_id(self, get_client, instance_update):
+        operation = mock.Mock()
+        operation.result_return_value = Instance(instance_id=CBT_INSTANCE, client=get_client)
+        instance_update.return_value = operation
+        res = self.bigtable_hook_no_default_project_id.update_instance(
+            project_id=GCP_PROJECT_ID_HOOK_UNIT_TEST,
+            instance_id=CBT_INSTANCE,
+            instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
+            instance_type=CBT_INSTANCE_TYPE,
+            instance_labels=CBT_INSTANCE_LABELS
+        )
+        get_client.assert_called_once_with(project_id='example-project')
+        instance_update.assert_called_once_with()
+        self.assertEqual(res.instance_id, 'instance')
+
     @mock.patch('airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client')
     def test_delete_table_overridden_project_id(self, get_client):
         instance_method = get_client.return_value.instance
@@ -268,6 +289,28 @@ class TestBigtableHookDefaultProjectId(unittest.TestCase):
         instance_create.assert_called_once_with(clusters=mock.ANY)
         self.assertEqual(res.instance_id, 'instance')
 
+    @mock.patch(
+        'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id',
+        new_callable=PropertyMock,
+        return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST
+    )
+    @mock.patch('google.cloud.bigtable.instance.Instance.update')
+    @mock.patch('airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client')
+    def test_update_instance(self, get_client, instance_update, mock_project_id):
+        operation = mock.Mock()
+        operation.result_return_value = Instance(instance_id=CBT_INSTANCE, client=get_client)
+        instance_update.return_value = operation
+        res = self.bigtable_hook_default_project_id.update_instance(
+            instance_id=CBT_INSTANCE,
+            instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
+            instance_type=CBT_INSTANCE_TYPE,
+            instance_labels=CBT_INSTANCE_LABELS,
+            project_id=GCP_PROJECT_ID_HOOK_UNIT_TEST,
+        )
+        get_client.assert_called_once_with(project_id='example-project')
+        instance_update.assert_called_once_with()
+        self.assertEqual(res.instance_id, 'instance')
+
     @mock.patch('google.cloud.bigtable.instance.Instance.create')
     @mock.patch('airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client')
     def test_create_instance_overridden_project_id(self, get_client, instance_create):
diff --git a/tests/providers/google/cloud/operators/test_bigtable.py b/tests/providers/google/cloud/operators/test_bigtable.py
index d3c2dfe..618872e 100644
--- a/tests/providers/google/cloud/operators/test_bigtable.py
+++ b/tests/providers/google/cloud/operators/test_bigtable.py
@@ -23,12 +23,13 @@ import google.api_core.exceptions
 import mock
 from google.cloud.bigtable.column_family import MaxVersionsGCRule
 from google.cloud.bigtable.instance import Instance
+from google.cloud.bigtable_admin_v2 import enums
 from parameterized import parameterized
 
 from airflow.exceptions import AirflowException
 from airflow.providers.google.cloud.operators.bigtable import (
     BigtableCreateInstanceOperator, BigtableCreateTableOperator, BigtableDeleteInstanceOperator,
-    BigtableDeleteTableOperator, BigtableUpdateClusterOperator,
+    BigtableDeleteTableOperator, BigtableUpdateClusterOperator, BigtableUpdateInstanceOperator,
 )
 
 PROJECT_ID = 'test_project_id'
@@ -37,6 +38,9 @@ CLUSTER_ID = 'test-cluster-id'
 CLUSTER_ZONE = 'us-central1-f'
 GCP_CONN_ID = 'test-gcp-conn-id'
 NODES = 5
+INSTANCE_DISPLAY_NAME = "test instance"
+INSTANCE_TYPE = enums.Instance.Type.PRODUCTION
+INSTANCE_LABELS = {"env": "sit"}
 TABLE_ID = 'test-table-id'
 INITIAL_SPLIT_KEYS = []  # type: List
 EMPTY_COLUMN_FAMILIES = {}  # type: Dict
@@ -133,6 +137,141 @@ class TestBigtableInstanceCreate(unittest.TestCase):
         )
 
 
+class TestBigtableInstanceUpdate(unittest.TestCase):
+    @mock.patch('airflow.providers.google.cloud.operators.bigtable.BigtableHook')
+    def test_delete_execute(self, mock_hook):
+        op = BigtableUpdateInstanceOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            instance_display_name=INSTANCE_DISPLAY_NAME,
+            instance_type=INSTANCE_TYPE,
+            instance_labels=INSTANCE_LABELS,
+            task_id="id",
+            gcp_conn_id=GCP_CONN_ID
+        )
+        op.execute(None)
+        mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
+        mock_hook.return_value.update_instance.assert_called_once_with(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            instance_display_name=INSTANCE_DISPLAY_NAME,
+            instance_type=INSTANCE_TYPE,
+            instance_labels=INSTANCE_LABELS,
+            timeout=None
+        )
+
+    @mock.patch('airflow.providers.google.cloud.operators.bigtable.BigtableHook')
+    def test_update_execute_empty_project_id(self, mock_hook):
+        op = BigtableUpdateInstanceOperator(
+            instance_id=INSTANCE_ID,
+            instance_display_name=INSTANCE_DISPLAY_NAME,
+            instance_type=INSTANCE_TYPE,
+            instance_labels=INSTANCE_LABELS,
+            task_id="id",
+            gcp_conn_id=GCP_CONN_ID
+        )
+        op.execute(None)
+        mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
+        mock_hook.return_value.update_instance.assert_called_once_with(
+            project_id=None,
+            instance_id=INSTANCE_ID,
+            instance_display_name=INSTANCE_DISPLAY_NAME,
+            instance_type=INSTANCE_TYPE,
+            instance_labels=INSTANCE_LABELS,
+            timeout=None
+        )
+
+    @parameterized.expand([
+        ('instance_id', PROJECT_ID, ''),
+    ], testcase_func_name=lambda f, n, p: 'test_empty_attribute.empty_' + p.args[0])
+    @mock.patch('airflow.providers.google.cloud.operators.bigtable.BigtableHook')
+    def test_empty_attribute(self, missing_attribute, project_id, instance_id, mock_hook):
+        with self.assertRaises(AirflowException) as e:
+            BigtableUpdateInstanceOperator(
+                project_id=project_id,
+                instance_id=instance_id,
+                instance_display_name=INSTANCE_DISPLAY_NAME,
+                instance_type=INSTANCE_TYPE,
+                instance_labels=INSTANCE_LABELS,
+                task_id="id"
+            )
+        err = e.exception
+        self.assertEqual(str(err), 'Empty parameter: {}'.format(missing_attribute))
+        mock_hook.assert_not_called()
+
+    @mock.patch('airflow.providers.google.cloud.operators.bigtable.BigtableHook')
+    def test_update_instance_that_doesnt_exists(self, mock_hook):
+        mock_hook.return_value.get_instance.return_value = None
+
+        with self.assertRaises(AirflowException) as e:
+            op = BigtableUpdateInstanceOperator(
+                project_id=PROJECT_ID,
+                instance_id=INSTANCE_ID,
+                instance_display_name=INSTANCE_DISPLAY_NAME,
+                instance_type=INSTANCE_TYPE,
+                instance_labels=INSTANCE_LABELS,
+                task_id="id",
+                gcp_conn_id=GCP_CONN_ID
+            )
+            op.execute(None)
+
+        err = e.exception
+        self.assertEqual(str(err), "Dependency: instance '{}' does not exist.".format(
+            INSTANCE_ID))
+
+        mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
+        mock_hook.return_value.update_instance.assert_not_called()
+
+    @mock.patch('airflow.providers.google.cloud.operators.bigtable.BigtableHook')
+    def test_update_instance_that_doesnt_exists_empty_project_id(self, mock_hook):
+        mock_hook.return_value.get_instance.return_value = None
+
+        with self.assertRaises(AirflowException) as e:
+            op = BigtableUpdateInstanceOperator(
+                instance_id=INSTANCE_ID,
+                instance_display_name=INSTANCE_DISPLAY_NAME,
+                instance_type=INSTANCE_TYPE,
+                instance_labels=INSTANCE_LABELS,
+                task_id="id",
+                gcp_conn_id=GCP_CONN_ID
+            )
+            op.execute(None)
+
+        err = e.exception
+        self.assertEqual(str(err), "Dependency: instance '{}' does not exist.".format(
+            INSTANCE_ID))
+
+        mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
+        mock_hook.return_value.update_instance.assert_not_called()
+
+    @mock.patch('airflow.providers.google.cloud.operators.bigtable.BigtableHook')
+    def test_different_error_reraised(self, mock_hook):
+        op = BigtableUpdateInstanceOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            instance_display_name=INSTANCE_DISPLAY_NAME,
+            instance_type=INSTANCE_TYPE,
+            instance_labels=INSTANCE_LABELS,
+            task_id="id",
+            gcp_conn_id=GCP_CONN_ID
+        )
+        mock_hook.return_value.update_instance.side_effect = mock.Mock(
+            side_effect=google.api_core.exceptions.GoogleAPICallError('error'))
+
+        with self.assertRaises(google.api_core.exceptions.GoogleAPICallError):
+            op.execute(None)
+
+        mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
+        mock_hook.return_value.update_instance.assert_called_once_with(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            instance_display_name=INSTANCE_DISPLAY_NAME,
+            instance_type=INSTANCE_TYPE,
+            instance_labels=INSTANCE_LABELS,
+            timeout=None
+        )
+
+
 class TestBigtableClusterUpdate(unittest.TestCase):
     @parameterized.expand([
         ('instance_id', PROJECT_ID, '', CLUSTER_ID, NODES),