You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by tu...@apache.org on 2021/01/27 11:26:06 UTC
[airflow] branch master updated: Fix and improve GCP BigTable hook
and system test (#13896)
This is an automated email from the ASF dual-hosted git repository.
turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 810c15e Fix and improve GCP BigTable hook and system test (#13896)
810c15e is described below
commit 810c15ed85d7bcde8d5b8bc44e1cbd4859e29d2e
Author: Tobiasz Kędzierski <to...@polidea.com>
AuthorDate: Wed Jan 27 12:25:40 2021 +0100
Fix and improve GCP BigTable hook and system test (#13896)
Improve environment variables in GCP BigTable system test.
It will help to parametrize system tests.
---
.../google/cloud/example_dags/example_bigtable.py | 32 ++++++------
airflow/providers/google/cloud/hooks/bigtable.py | 9 +++-
.../providers/google/cloud/hooks/test_bigtable.py | 58 ++++++++++++++++++++--
.../google/cloud/operators/test_bigtable_system.py | 7 +--
4 files changed, 81 insertions(+), 25 deletions(-)
diff --git a/airflow/providers/google/cloud/example_dags/example_bigtable.py b/airflow/providers/google/cloud/example_dags/example_bigtable.py
index ce852ed..fc62cdf 100644
--- a/airflow/providers/google/cloud/example_dags/example_bigtable.py
+++ b/airflow/providers/google/cloud/example_dags/example_bigtable.py
@@ -60,22 +60,22 @@ from airflow.providers.google.cloud.sensors.bigtable import BigtableTableReplica
from airflow.utils.dates import days_ago
GCP_PROJECT_ID = getenv('GCP_PROJECT_ID', 'example-project')
-CBT_INSTANCE_ID = getenv('CBT_INSTANCE_ID', 'some-instance-id')
-CBT_INSTANCE_DISPLAY_NAME = getenv('CBT_INSTANCE_DISPLAY_NAME', 'Human-readable name')
+CBT_INSTANCE_ID = getenv('GCP_BIG_TABLE_INSTANCE_ID', 'some-instance-id')
+CBT_INSTANCE_DISPLAY_NAME = getenv('GCP_BIG_TABLE_INSTANCE_DISPLAY_NAME', 'Human-readable name')
CBT_INSTANCE_DISPLAY_NAME_UPDATED = getenv(
- "CBT_INSTANCE_DISPLAY_NAME_UPDATED", "Human-readable name - updated"
+ "GCP_BIG_TABLE_INSTANCE_DISPLAY_NAME_UPDATED", f"{CBT_INSTANCE_DISPLAY_NAME} - updated"
)
-CBT_INSTANCE_TYPE = getenv('CBT_INSTANCE_TYPE', '2')
-CBT_INSTANCE_TYPE_PROD = getenv('CBT_INSTANCE_TYPE_PROD', '1')
-CBT_INSTANCE_LABELS = getenv('CBT_INSTANCE_LABELS', '{}')
-CBT_INSTANCE_LABELS_UPDATED = getenv('CBT_INSTANCE_LABELS', '{"env": "prod"}')
-CBT_CLUSTER_ID = getenv('CBT_CLUSTER_ID', 'some-cluster-id')
-CBT_CLUSTER_ZONE = getenv('CBT_CLUSTER_ZONE', 'europe-west1-b')
-CBT_CLUSTER_NODES = getenv('CBT_CLUSTER_NODES', '3')
-CBT_CLUSTER_NODES_UPDATED = getenv('CBT_CLUSTER_NODES_UPDATED', '5')
-CBT_CLUSTER_STORAGE_TYPE = getenv('CBT_CLUSTER_STORAGE_TYPE', '2')
-CBT_TABLE_ID = getenv('CBT_TABLE_ID', 'some-table-id')
-CBT_POKE_INTERVAL = getenv('CBT_POKE_INTERVAL', '60')
+CBT_INSTANCE_TYPE = getenv('GCP_BIG_TABLE_INSTANCE_TYPE', '2')
+CBT_INSTANCE_TYPE_PROD = getenv('GCP_BIG_TABLE_INSTANCE_TYPE_PROD', '1')
+CBT_INSTANCE_LABELS = getenv('GCP_BIG_TABLE_INSTANCE_LABELS', '{}')
+CBT_INSTANCE_LABELS_UPDATED = getenv('GCP_BIG_TABLE_INSTANCE_LABELS_UPDATED', '{"env": "prod"}')
+CBT_CLUSTER_ID = getenv('GCP_BIG_TABLE_CLUSTER_ID', 'some-cluster-id')
+CBT_CLUSTER_ZONE = getenv('GCP_BIG_TABLE_CLUSTER_ZONE', 'europe-west1-b')
+CBT_CLUSTER_NODES = getenv('GCP_BIG_TABLE_CLUSTER_NODES', '3')
+CBT_CLUSTER_NODES_UPDATED = getenv('GCP_BIG_TABLE_CLUSTER_NODES_UPDATED', '5')
+CBT_CLUSTER_STORAGE_TYPE = getenv('GCP_BIG_TABLE_CLUSTER_STORAGE_TYPE', '2')
+CBT_TABLE_ID = getenv('GCP_BIG_TABLE_TABLE_ID', 'some-table-id')
+CBT_POKE_INTERVAL = getenv('GCP_BIG_TABLE_POKE_INTERVAL', '60')
with models.DAG(
@@ -93,8 +93,8 @@ with models.DAG(
instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
instance_type=int(CBT_INSTANCE_TYPE),
instance_labels=json.loads(CBT_INSTANCE_LABELS),
- cluster_nodes=int(CBT_CLUSTER_NODES),
- cluster_storage_type=CBT_CLUSTER_STORAGE_TYPE,
+ cluster_nodes=None,
+ cluster_storage_type=int(CBT_CLUSTER_STORAGE_TYPE),
task_id='create_instance_task',
)
create_instance_task2 = BigtableCreateInstanceOperator(
diff --git a/airflow/providers/google/cloud/hooks/bigtable.py b/airflow/providers/google/cloud/hooks/bigtable.py
index c5a2fa1..60e309d 100644
--- a/airflow/providers/google/cloud/hooks/bigtable.py
+++ b/airflow/providers/google/cloud/hooks/bigtable.py
@@ -169,7 +169,14 @@ class BigtableHook(GoogleBaseHook):
instance_labels,
)
- clusters = [instance.cluster(main_cluster_id, main_cluster_zone, cluster_nodes, cluster_storage_type)]
+ cluster_kwargs = dict(
+ cluster_id=main_cluster_id,
+ location_id=main_cluster_zone,
+ default_storage_type=cluster_storage_type,
+ )
+ if instance_type != enums.Instance.Type.DEVELOPMENT and cluster_nodes:
+ cluster_kwargs["serve_nodes"] = cluster_nodes
+ clusters = [instance.cluster(**cluster_kwargs)]
if replica_cluster_id and replica_cluster_zone:
warnings.warn(
"The replica_cluster_id and replica_cluster_zone parameter have been deprecated."
diff --git a/tests/providers/google/cloud/hooks/test_bigtable.py b/tests/providers/google/cloud/hooks/test_bigtable.py
index a452c48..16437fe 100644
--- a/tests/providers/google/cloud/hooks/test_bigtable.py
+++ b/tests/providers/google/cloud/hooks/test_bigtable.py
@@ -309,7 +309,7 @@ class TestBigtableHookDefaultProjectId(unittest.TestCase):
@mock.patch('google.cloud.bigtable.instance.Instance.cluster')
@mock.patch('google.cloud.bigtable.instance.Instance.create')
@mock.patch('airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client')
- def test_create_instance_with_one_replica_cluster(
+ def test_create_instance_with_one_replica_cluster_production(
self, get_client, instance_create, cluster, mock_project_id
):
operation = mock.Mock()
@@ -325,10 +325,57 @@ class TestBigtableHookDefaultProjectId(unittest.TestCase):
cluster_nodes=1,
cluster_storage_type=enums.StorageType.SSD,
project_id=GCP_PROJECT_ID_HOOK_UNIT_TEST,
+ instance_type=enums.Instance.Type.PRODUCTION,
)
cluster.assert_has_calls(
[
- unittest.mock.call(CBT_CLUSTER, CBT_ZONE, 1, enums.StorageType.SSD),
+ unittest.mock.call(
+ cluster_id=CBT_CLUSTER,
+ location_id=CBT_ZONE,
+ serve_nodes=1,
+ default_storage_type=enums.StorageType.SSD,
+ ),
+ unittest.mock.call(
+ CBT_REPLICA_CLUSTER_ID, CBT_REPLICA_CLUSTER_ZONE, 1, enums.StorageType.SSD
+ ),
+ ],
+ any_order=True,
+ )
+ get_client.assert_called_once_with(project_id='example-project')
+ instance_create.assert_called_once_with(clusters=mock.ANY)
+ assert res.instance_id == 'instance'
+
+ @mock.patch(
+ 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id',
+ new_callable=PropertyMock,
+ return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST,
+ )
+ @mock.patch('google.cloud.bigtable.instance.Instance.cluster')
+ @mock.patch('google.cloud.bigtable.instance.Instance.create')
+ @mock.patch('airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client')
+ def test_create_instance_with_one_replica_cluster_development(
+ self, get_client, instance_create, cluster, mock_project_id
+ ):
+ operation = mock.Mock()
+ operation.result_return_value = Instance(instance_id=CBT_INSTANCE, client=get_client)
+ instance_create.return_value = operation
+
+ res = self.bigtable_hook_default_project_id.create_instance(
+ instance_id=CBT_INSTANCE,
+ main_cluster_id=CBT_CLUSTER,
+ main_cluster_zone=CBT_ZONE,
+ replica_cluster_id=CBT_REPLICA_CLUSTER_ID,
+ replica_cluster_zone=CBT_REPLICA_CLUSTER_ZONE,
+ cluster_nodes=1,
+ cluster_storage_type=enums.StorageType.SSD,
+ project_id=GCP_PROJECT_ID_HOOK_UNIT_TEST,
+ instance_type=enums.Instance.Type.DEVELOPMENT,
+ )
+ cluster.assert_has_calls(
+ [
+ unittest.mock.call(
+ cluster_id=CBT_CLUSTER, location_id=CBT_ZONE, default_storage_type=enums.StorageType.SSD
+ ),
unittest.mock.call(
CBT_REPLICA_CLUSTER_ID, CBT_REPLICA_CLUSTER_ZONE, 1, enums.StorageType.SSD
),
@@ -365,7 +412,12 @@ class TestBigtableHookDefaultProjectId(unittest.TestCase):
)
cluster.assert_has_calls(
[
- unittest.mock.call(CBT_CLUSTER, CBT_ZONE, 1, enums.StorageType.SSD),
+ unittest.mock.call(
+ cluster_id=CBT_CLUSTER,
+ location_id=CBT_ZONE,
+ serve_nodes=1,
+ default_storage_type=enums.StorageType.SSD,
+ ),
unittest.mock.call('replica-1', 'us-west1-a', 1, enums.StorageType.SSD),
unittest.mock.call('replica-2', 'us-central1-f', 1, enums.StorageType.SSD),
unittest.mock.call('replica-3', 'us-east1-d', 1, enums.StorageType.SSD),
diff --git a/tests/providers/google/cloud/operators/test_bigtable_system.py b/tests/providers/google/cloud/operators/test_bigtable_system.py
index b987731..ea83493 100644
--- a/tests/providers/google/cloud/operators/test_bigtable_system.py
+++ b/tests/providers/google/cloud/operators/test_bigtable_system.py
@@ -15,16 +15,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-import os
import pytest
+from airflow.providers.google.cloud.example_dags.example_bigtable import CBT_INSTANCE_ID, GCP_PROJECT_ID
from tests.providers.google.cloud.utils.gcp_authenticator import GCP_BIGTABLE_KEY
from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
-GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
-CBT_INSTANCE = os.environ.get('CBT_INSTANCE_ID', 'testinstance')
-
@pytest.mark.backend("mysql", "postgres")
@pytest.mark.credential_file(GCP_BIGTABLE_KEY)
@@ -45,7 +42,7 @@ class BigTableExampleDagsSystemTest(GoogleSystemTest):
'--verbosity=none',
'instances',
'delete',
- CBT_INSTANCE,
+ CBT_INSTANCE_ID,
],
key=GCP_BIGTABLE_KEY,
)