You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by tu...@apache.org on 2020/08/16 03:53:07 UTC
[airflow] branch master updated: Add Bigtable Update Instance
Hook/Operator (#10340)
This is an automated email from the ASF dual-hosted git repository.
turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 382c101 Add Bigtable Update Instance Hook/Operator (#10340)
382c101 is described below
commit 382c1011b6bcebd22760e2f98419281ef1a09d1b
Author: Ryan Yuan <ry...@outlook.com>
AuthorDate: Sun Aug 16 13:52:14 2020 +1000
Add Bigtable Update Instance Hook/Operator (#10340)
Add Bigtable Update Instance Hook/Operator
---
.../google/cloud/example_dags/example_bigtable.py | 18 ++-
airflow/providers/google/cloud/hooks/bigtable.py | 46 +++++++
.../providers/google/cloud/operators/bigtable.py | 79 +++++++++++-
docs/howto/operator/google/cloud/bigtable.rst | 23 ++++
.../providers/google/cloud/hooks/test_bigtable.py | 43 +++++++
.../google/cloud/operators/test_bigtable.py | 141 ++++++++++++++++++++-
6 files changed, 347 insertions(+), 3 deletions(-)
diff --git a/airflow/providers/google/cloud/example_dags/example_bigtable.py b/airflow/providers/google/cloud/example_dags/example_bigtable.py
index b51b6ff..f54c6de 100644
--- a/airflow/providers/google/cloud/example_dags/example_bigtable.py
+++ b/airflow/providers/google/cloud/example_dags/example_bigtable.py
@@ -51,7 +51,7 @@ from os import getenv
from airflow import models
from airflow.providers.google.cloud.operators.bigtable import (
BigtableCreateInstanceOperator, BigtableCreateTableOperator, BigtableDeleteInstanceOperator,
- BigtableDeleteTableOperator, BigtableUpdateClusterOperator,
+ BigtableDeleteTableOperator, BigtableUpdateClusterOperator, BigtableUpdateInstanceOperator,
)
from airflow.providers.google.cloud.sensors.bigtable import BigtableTableReplicationCompletedSensor
from airflow.utils.dates import days_ago
@@ -59,8 +59,13 @@ from airflow.utils.dates import days_ago
GCP_PROJECT_ID = getenv('GCP_PROJECT_ID', 'example-project')
CBT_INSTANCE_ID = getenv('CBT_INSTANCE_ID', 'some-instance-id')
CBT_INSTANCE_DISPLAY_NAME = getenv('CBT_INSTANCE_DISPLAY_NAME', 'Human-readable name')
+CBT_INSTANCE_DISPLAY_NAME_UPDATED = getenv(
+ "CBT_INSTANCE_DISPLAY_NAME_UPDATED", "Human-readable name - updated"
+)
CBT_INSTANCE_TYPE = getenv('CBT_INSTANCE_TYPE', '2')
+CBT_INSTANCE_TYPE_PROD = getenv('CBT_INSTANCE_TYPE_PROD', '1')
CBT_INSTANCE_LABELS = getenv('CBT_INSTANCE_LABELS', '{}')
+CBT_INSTANCE_LABELS_UPDATED = getenv('CBT_INSTANCE_LABELS', '{"env": "prod"}')
CBT_CLUSTER_ID = getenv('CBT_CLUSTER_ID', 'some-cluster-id')
CBT_CLUSTER_ZONE = getenv('CBT_CLUSTER_ZONE', 'europe-west1-b')
CBT_CLUSTER_NODES = getenv('CBT_CLUSTER_NODES', '3')
@@ -103,6 +108,16 @@ with models.DAG(
create_instance_task >> create_instance_task2
# [END howto_operator_gcp_bigtable_instance_create]
+ # [START howto_operator_gcp_bigtable_instance_update]
+ update_instance_task = BigtableUpdateInstanceOperator(
+ instance_id=CBT_INSTANCE_ID,
+ instance_display_name=CBT_INSTANCE_DISPLAY_NAME_UPDATED,
+ instance_type=int(CBT_INSTANCE_TYPE_PROD),
+ instance_labels=json.loads(CBT_INSTANCE_LABELS_UPDATED),
+ task_id='update_instance_task',
+ )
+ # [END howto_operator_gcp_bigtable_instance_update]
+
# [START howto_operator_gcp_bigtable_cluster_update]
cluster_update_task = BigtableUpdateClusterOperator(
project_id=GCP_PROJECT_ID,
@@ -186,6 +201,7 @@ with models.DAG(
create_instance_task \
>> create_table_task \
>> cluster_update_task \
+ >> update_instance_task \
>> delete_table_task
create_instance_task2 \
>> create_table_task2 \
diff --git a/airflow/providers/google/cloud/hooks/bigtable.py b/airflow/providers/google/cloud/hooks/bigtable.py
index 0627fcc..1cd277d 100644
--- a/airflow/providers/google/cloud/hooks/bigtable.py
+++ b/airflow/providers/google/cloud/hooks/bigtable.py
@@ -18,6 +18,7 @@
"""
This module contains a Google Cloud Bigtable Hook.
"""
+import enum
from typing import Dict, List, Optional, Sequence, Union
from google.cloud.bigtable import Client
@@ -184,6 +185,51 @@ class BigtableHook(GoogleBaseHook):
operation.result(timeout)
return instance
+ @GoogleBaseHook.fallback_to_default_project_id
+ def update_instance(
+ self,
+ instance_id: str,
+ project_id: str,
+ instance_display_name: Optional[str] = None,
+ instance_type: Optional[Union[enums.Instance.Type, enum.IntEnum]] = None,
+ instance_labels: Optional[Dict] = None,
+ timeout: Optional[float] = None
+ ) -> Instance:
+ """
+ Update an existing instance.
+
+ :type instance_id: str
+ :param instance_id: The ID for the existing instance.
+ :type project_id: str
+ :param project_id: Optional, Google Cloud Platform project ID where the
+ BigTable exists. If set to None or missing,
+ the default project_id from the GCP connection is used.
+ :type instance_display_name: str
+ :param instance_display_name: (optional) Human-readable name of the instance.
+ :type instance_type: enums.Instance.Type or enum.IntEnum
+ :param instance_type: (optional) The type of the instance.
+ :type instance_labels: dict
+ :param instance_labels: (optional) Dictionary of labels to associate with the
+ instance.
+ :type timeout: int
+ :param timeout: (optional) timeout (in seconds) for instance update.
+ If None is not specified, Operator will wait indefinitely.
+ """
+ instance_type = enums.Instance.Type(instance_type)
+
+ instance = Instance(
+ instance_id=instance_id,
+ client=self._get_client(project_id=project_id),
+ display_name=instance_display_name,
+ instance_type=instance_type,
+ labels=instance_labels,
+ )
+
+ operation = instance.update()
+ operation.result(timeout)
+
+ return instance
+
@staticmethod
def create_table(
instance: Instance,
diff --git a/airflow/providers/google/cloud/operators/bigtable.py b/airflow/providers/google/cloud/operators/bigtable.py
index 7098fb1..b8bed03 100644
--- a/airflow/providers/google/cloud/operators/bigtable.py
+++ b/airflow/providers/google/cloud/operators/bigtable.py
@@ -18,7 +18,8 @@
"""
This module contains Google Cloud Bigtable operators.
"""
-from typing import Dict, Iterable, List, Optional
+import enum
+from typing import Dict, Iterable, List, Optional, Union
import google.api_core.exceptions
from google.cloud.bigtable.column_family import GarbageCollectionRule
@@ -158,6 +159,82 @@ class BigtableCreateInstanceOperator(BaseOperator, BigtableValidationMixin):
raise e
+class BigtableUpdateInstanceOperator(BaseOperator, BigtableValidationMixin):
+ """
+ Updates an existing Cloud Bigtable instance.
+
+ For more details about instance creation have a look at the reference:
+ https://googleapis.dev/python/bigtable/latest/instance.html#google.cloud.bigtable.instance.Instance.update
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:BigtableUpdateInstanceOperator`
+
+ :type instance_id: str
+ :param instance_id: The ID of the Cloud Bigtable instance to update.
+ :type project_id: str
+ :param project_id: Optional, the ID of the GCP project. If set to None or missing,
+ the default project_id from the GCP connection is used.
+ :type instance_display_name: str
+ :param instance_display_name: (optional) Human-readable name of the instance.
+ :type instance_type: enums.Instance.Type or enum.IntEnum
+ :param instance_type: (optional) The type of the instance.
+ :type instance_labels: dict
+ :param instance_labels: (optional) Dictionary of labels to associate
+ with the instance.
+ :type timeout: int
+ :param timeout: (optional) timeout (in seconds) for instance update.
+ If None is not specified, Operator will wait indefinitely.
+ :param gcp_conn_id: The connection ID to use to connect to Google Cloud Platform.
+ :type gcp_conn_id: str
+ """
+
+ REQUIRED_ATTRIBUTES: Iterable[str] = ['instance_id']
+ template_fields: Iterable[str] = ['project_id', 'instance_id']
+
+ @apply_defaults
+ def __init__(self, *,
+ instance_id: str,
+ project_id: Optional[str] = None,
+ instance_display_name: Optional[str] = None,
+ instance_type: Optional[Union[enums.Instance.Type, enum.IntEnum]] = None,
+ instance_labels: Optional[Dict] = None,
+ timeout: Optional[float] = None,
+ gcp_conn_id: str = 'google_cloud_default',
+ **kwargs) -> None:
+ self.project_id = project_id
+ self.instance_id = instance_id
+ self.instance_display_name = instance_display_name
+ self.instance_type = instance_type
+ self.instance_labels = instance_labels
+ self.timeout = timeout
+ self._validate_inputs()
+ self.gcp_conn_id = gcp_conn_id
+ super().__init__(**kwargs)
+
+ def execute(self, context):
+ hook = BigtableHook(gcp_conn_id=self.gcp_conn_id)
+ instance = hook.get_instance(project_id=self.project_id,
+ instance_id=self.instance_id)
+ if not instance:
+ raise AirflowException(
+ f"Dependency: instance '{self.instance_id}' does not exist."
+ )
+
+ try:
+ hook.update_instance(
+ project_id=self.project_id,
+ instance_id=self.instance_id,
+ instance_display_name=self.instance_display_name,
+ instance_type=self.instance_type,
+ instance_labels=self.instance_labels,
+ timeout=self.timeout,
+ )
+ except google.api_core.exceptions.GoogleAPICallError as e:
+ self.log.error('An error occurred. Exiting.')
+ raise e
+
+
class BigtableDeleteInstanceOperator(BaseOperator, BigtableValidationMixin):
"""
Deletes the Cloud Bigtable instance, including its clusters and all related tables.
diff --git a/docs/howto/operator/google/cloud/bigtable.rst b/docs/howto/operator/google/cloud/bigtable.rst
index befd2ea..b2f45bf 100644
--- a/docs/howto/operator/google/cloud/bigtable.rst
+++ b/docs/howto/operator/google/cloud/bigtable.rst
@@ -53,6 +53,29 @@ it will be retrieved from the GCP connection used. Both variants are shown:
:start-after: [START howto_operator_gcp_bigtable_instance_create]
:end-before: [END howto_operator_gcp_bigtable_instance_create]
+.. _howto/operator:BigtableUpdateInstanceOperator:
+
+BigtableUpdateInstanceOperator
+------------------------------
+
+Use the :class:`~airflow.providers.google.cloud.operators.bigtable.BigtableUpdateInstanceOperator`
+to update an existing Google Cloud Bigtable instance.
+
+Only the following configuration can be updated for an existing instance:
+instance_display_name, instance_type and instance_labels.
+
+Using the operator
+""""""""""""""""""
+
+You can create the operator with or without project id. If project id is missing
+it will be retrieved from the GCP connection used. Both variants are shown:
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_bigtable.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_gcp_bigtable_instance_update]
+ :end-before: [END howto_operator_gcp_bigtable_instance_update]
+
.. _howto/operator:BigtableDeleteInstanceOperator:
BigtableDeleteInstanceOperator
diff --git a/tests/providers/google/cloud/hooks/test_bigtable.py b/tests/providers/google/cloud/hooks/test_bigtable.py
index 6939069..0d32e5f 100644
--- a/tests/providers/google/cloud/hooks/test_bigtable.py
+++ b/tests/providers/google/cloud/hooks/test_bigtable.py
@@ -22,6 +22,7 @@ import google
import mock
from google.cloud.bigtable import Client
from google.cloud.bigtable.instance import Instance
+from google.cloud.bigtable_admin_v2 import enums
from mock import PropertyMock
from airflow.providers.google.cloud.hooks.bigtable import BigtableHook
@@ -31,6 +32,9 @@ from tests.providers.google.cloud.utils.base_gcp_mock import (
)
CBT_INSTANCE = 'instance'
+CBT_INSTANCE_DISPLAY_NAME = "test instance"
+CBT_INSTANCE_TYPE = enums.Instance.Type.PRODUCTION
+CBT_INSTANCE_LABELS = {"env": "sit"}
CBT_CLUSTER = 'cluster'
CBT_ZONE = 'zone'
CBT_TABLE = 'table'
@@ -102,6 +106,23 @@ class TestBigtableHookNoDefaultProjectId(unittest.TestCase):
instance_create.assert_called_once_with(clusters=mock.ANY)
self.assertEqual(res.instance_id, 'instance')
+ @mock.patch('google.cloud.bigtable.instance.Instance.update')
+ @mock.patch('airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client')
+ def test_update_instance_overridden_project_id(self, get_client, instance_update):
+ operation = mock.Mock()
+ operation.result_return_value = Instance(instance_id=CBT_INSTANCE, client=get_client)
+ instance_update.return_value = operation
+ res = self.bigtable_hook_no_default_project_id.update_instance(
+ project_id=GCP_PROJECT_ID_HOOK_UNIT_TEST,
+ instance_id=CBT_INSTANCE,
+ instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
+ instance_type=CBT_INSTANCE_TYPE,
+ instance_labels=CBT_INSTANCE_LABELS
+ )
+ get_client.assert_called_once_with(project_id='example-project')
+ instance_update.assert_called_once_with()
+ self.assertEqual(res.instance_id, 'instance')
+
@mock.patch('airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client')
def test_delete_table_overridden_project_id(self, get_client):
instance_method = get_client.return_value.instance
@@ -268,6 +289,28 @@ class TestBigtableHookDefaultProjectId(unittest.TestCase):
instance_create.assert_called_once_with(clusters=mock.ANY)
self.assertEqual(res.instance_id, 'instance')
+ @mock.patch(
+ 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id',
+ new_callable=PropertyMock,
+ return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST
+ )
+ @mock.patch('google.cloud.bigtable.instance.Instance.update')
+ @mock.patch('airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client')
+ def test_update_instance(self, get_client, instance_update, mock_project_id):
+ operation = mock.Mock()
+ operation.result_return_value = Instance(instance_id=CBT_INSTANCE, client=get_client)
+ instance_update.return_value = operation
+ res = self.bigtable_hook_default_project_id.update_instance(
+ instance_id=CBT_INSTANCE,
+ instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
+ instance_type=CBT_INSTANCE_TYPE,
+ instance_labels=CBT_INSTANCE_LABELS,
+ project_id=GCP_PROJECT_ID_HOOK_UNIT_TEST,
+ )
+ get_client.assert_called_once_with(project_id='example-project')
+ instance_update.assert_called_once_with()
+ self.assertEqual(res.instance_id, 'instance')
+
@mock.patch('google.cloud.bigtable.instance.Instance.create')
@mock.patch('airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client')
def test_create_instance_overridden_project_id(self, get_client, instance_create):
diff --git a/tests/providers/google/cloud/operators/test_bigtable.py b/tests/providers/google/cloud/operators/test_bigtable.py
index d3c2dfe..618872e 100644
--- a/tests/providers/google/cloud/operators/test_bigtable.py
+++ b/tests/providers/google/cloud/operators/test_bigtable.py
@@ -23,12 +23,13 @@ import google.api_core.exceptions
import mock
from google.cloud.bigtable.column_family import MaxVersionsGCRule
from google.cloud.bigtable.instance import Instance
+from google.cloud.bigtable_admin_v2 import enums
from parameterized import parameterized
from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.operators.bigtable import (
BigtableCreateInstanceOperator, BigtableCreateTableOperator, BigtableDeleteInstanceOperator,
- BigtableDeleteTableOperator, BigtableUpdateClusterOperator,
+ BigtableDeleteTableOperator, BigtableUpdateClusterOperator, BigtableUpdateInstanceOperator,
)
PROJECT_ID = 'test_project_id'
@@ -37,6 +38,9 @@ CLUSTER_ID = 'test-cluster-id'
CLUSTER_ZONE = 'us-central1-f'
GCP_CONN_ID = 'test-gcp-conn-id'
NODES = 5
+INSTANCE_DISPLAY_NAME = "test instance"
+INSTANCE_TYPE = enums.Instance.Type.PRODUCTION
+INSTANCE_LABELS = {"env": "sit"}
TABLE_ID = 'test-table-id'
INITIAL_SPLIT_KEYS = [] # type: List
EMPTY_COLUMN_FAMILIES = {} # type: Dict
@@ -133,6 +137,141 @@ class TestBigtableInstanceCreate(unittest.TestCase):
)
+class TestBigtableInstanceUpdate(unittest.TestCase):
+ @mock.patch('airflow.providers.google.cloud.operators.bigtable.BigtableHook')
+ def test_delete_execute(self, mock_hook):
+ op = BigtableUpdateInstanceOperator(
+ project_id=PROJECT_ID,
+ instance_id=INSTANCE_ID,
+ instance_display_name=INSTANCE_DISPLAY_NAME,
+ instance_type=INSTANCE_TYPE,
+ instance_labels=INSTANCE_LABELS,
+ task_id="id",
+ gcp_conn_id=GCP_CONN_ID
+ )
+ op.execute(None)
+ mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
+ mock_hook.return_value.update_instance.assert_called_once_with(
+ project_id=PROJECT_ID,
+ instance_id=INSTANCE_ID,
+ instance_display_name=INSTANCE_DISPLAY_NAME,
+ instance_type=INSTANCE_TYPE,
+ instance_labels=INSTANCE_LABELS,
+ timeout=None
+ )
+
+ @mock.patch('airflow.providers.google.cloud.operators.bigtable.BigtableHook')
+ def test_update_execute_empty_project_id(self, mock_hook):
+ op = BigtableUpdateInstanceOperator(
+ instance_id=INSTANCE_ID,
+ instance_display_name=INSTANCE_DISPLAY_NAME,
+ instance_type=INSTANCE_TYPE,
+ instance_labels=INSTANCE_LABELS,
+ task_id="id",
+ gcp_conn_id=GCP_CONN_ID
+ )
+ op.execute(None)
+ mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
+ mock_hook.return_value.update_instance.assert_called_once_with(
+ project_id=None,
+ instance_id=INSTANCE_ID,
+ instance_display_name=INSTANCE_DISPLAY_NAME,
+ instance_type=INSTANCE_TYPE,
+ instance_labels=INSTANCE_LABELS,
+ timeout=None
+ )
+
+ @parameterized.expand([
+ ('instance_id', PROJECT_ID, ''),
+ ], testcase_func_name=lambda f, n, p: 'test_empty_attribute.empty_' + p.args[0])
+ @mock.patch('airflow.providers.google.cloud.operators.bigtable.BigtableHook')
+ def test_empty_attribute(self, missing_attribute, project_id, instance_id, mock_hook):
+ with self.assertRaises(AirflowException) as e:
+ BigtableUpdateInstanceOperator(
+ project_id=project_id,
+ instance_id=instance_id,
+ instance_display_name=INSTANCE_DISPLAY_NAME,
+ instance_type=INSTANCE_TYPE,
+ instance_labels=INSTANCE_LABELS,
+ task_id="id"
+ )
+ err = e.exception
+ self.assertEqual(str(err), 'Empty parameter: {}'.format(missing_attribute))
+ mock_hook.assert_not_called()
+
+ @mock.patch('airflow.providers.google.cloud.operators.bigtable.BigtableHook')
+ def test_update_instance_that_doesnt_exists(self, mock_hook):
+ mock_hook.return_value.get_instance.return_value = None
+
+ with self.assertRaises(AirflowException) as e:
+ op = BigtableUpdateInstanceOperator(
+ project_id=PROJECT_ID,
+ instance_id=INSTANCE_ID,
+ instance_display_name=INSTANCE_DISPLAY_NAME,
+ instance_type=INSTANCE_TYPE,
+ instance_labels=INSTANCE_LABELS,
+ task_id="id",
+ gcp_conn_id=GCP_CONN_ID
+ )
+ op.execute(None)
+
+ err = e.exception
+ self.assertEqual(str(err), "Dependency: instance '{}' does not exist.".format(
+ INSTANCE_ID))
+
+ mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
+ mock_hook.return_value.update_instance.assert_not_called()
+
+ @mock.patch('airflow.providers.google.cloud.operators.bigtable.BigtableHook')
+ def test_update_instance_that_doesnt_exists_empty_project_id(self, mock_hook):
+ mock_hook.return_value.get_instance.return_value = None
+
+ with self.assertRaises(AirflowException) as e:
+ op = BigtableUpdateInstanceOperator(
+ instance_id=INSTANCE_ID,
+ instance_display_name=INSTANCE_DISPLAY_NAME,
+ instance_type=INSTANCE_TYPE,
+ instance_labels=INSTANCE_LABELS,
+ task_id="id",
+ gcp_conn_id=GCP_CONN_ID
+ )
+ op.execute(None)
+
+ err = e.exception
+ self.assertEqual(str(err), "Dependency: instance '{}' does not exist.".format(
+ INSTANCE_ID))
+
+ mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
+ mock_hook.return_value.update_instance.assert_not_called()
+
+ @mock.patch('airflow.providers.google.cloud.operators.bigtable.BigtableHook')
+ def test_different_error_reraised(self, mock_hook):
+ op = BigtableUpdateInstanceOperator(
+ project_id=PROJECT_ID,
+ instance_id=INSTANCE_ID,
+ instance_display_name=INSTANCE_DISPLAY_NAME,
+ instance_type=INSTANCE_TYPE,
+ instance_labels=INSTANCE_LABELS,
+ task_id="id",
+ gcp_conn_id=GCP_CONN_ID
+ )
+ mock_hook.return_value.update_instance.side_effect = mock.Mock(
+ side_effect=google.api_core.exceptions.GoogleAPICallError('error'))
+
+ with self.assertRaises(google.api_core.exceptions.GoogleAPICallError):
+ op.execute(None)
+
+ mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
+ mock_hook.return_value.update_instance.assert_called_once_with(
+ project_id=PROJECT_ID,
+ instance_id=INSTANCE_ID,
+ instance_display_name=INSTANCE_DISPLAY_NAME,
+ instance_type=INSTANCE_TYPE,
+ instance_labels=INSTANCE_LABELS,
+ timeout=None
+ )
+
+
class TestBigtableClusterUpdate(unittest.TestCase):
@parameterized.expand([
('instance_id', PROJECT_ID, '', CLUSTER_ID, NODES),