You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2023/02/11 02:18:48 UTC
[beam] branch bigtable-cdc-feature-branch updated: Create metadata table in initialize stage if table doesn't exist. (#25364)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch bigtable-cdc-feature-branch
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/bigtable-cdc-feature-branch by this push:
new feef2e40881 Create metadata table in initialize stage if table doesn't exist. (#25364)
feef2e40881 is described below
commit feef2e40881e611c7eb794e5d43d8086e0f5e482
Author: Tony Tang <nf...@gmail.com>
AuthorDate: Fri Feb 10 21:18:40 2023 -0500
Create metadata table in initialize stage if table doesn't exist. (#25364)
* Create metadata table during initialize stage of Bigtable Change Stream connector
* Fix GcpApiSurfaceTest that broke because of the addition of Cloud Bigtable's Admin API
* Fix import
* Fix dependencies error
* Change Cloud Bigtable dependencies to depend on custom public artifact registry instead of local jars
* Remove unncessary excludes
* Replace placeholder class with real implementation
---
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +-
sdks/java/io/google-cloud-platform/build.gradle | 25 ++-
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 2 +-
.../changestreams/ChangeStreamMutation.java | 24 ---
.../changestreams/action/ChangeStreamAction.java | 6 +-
.../action/ReadChangeStreamPartitionAction.java | 2 +-
.../dao/BigtableChangeStreamAccessor.java | 220 +++++++++++++++++++++
.../changestreams/dao/ChangeStreamDao.java | 5 +-
.../gcp/bigtable/changestreams/dao/DaoFactory.java | 18 +-
.../changestreams/dao/MetadataTableAdminDao.java | 94 ++++++++-
.../changestreams/dao/MetadataTableDao.java | 21 +-
.../changestreams/dofn/InitializeDoFn.java | 20 +-
.../dofn/ReadChangeStreamPartitionDoFn.java | 2 +-
.../changestreams/encoder/package-info.java | 24 ---
.../apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java | 12 +-
.../changestreams/dofn/InitializeDoFnTest.java | 123 ++++++++++++
16 files changed, 533 insertions(+), 67 deletions(-)
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 414b3ccdc55..db956e46a21 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -600,7 +600,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_cloud_bigquery_storage : "com.google.cloud:google-cloud-bigquerystorage", // google_cloud_platform_libraries_bom sets version
google_cloud_bigtable : "com.google.cloud:google-cloud-bigtable", // google_cloud_platform_libraries_bom sets version
google_cloud_bigtable_client_core : "com.google.cloud.bigtable:bigtable-client-core:1.26.3",
- google_cloud_bigtable_emulator : "com.google.cloud:google-cloud-bigtable-emulator:0.137.1",
+ google_cloud_bigtable_emulator : "com.google.cloud:google-cloud-bigtable-emulator:0.147.3",
google_cloud_core : "com.google.cloud:google-cloud-core", // google_cloud_platform_libraries_bom sets version
google_cloud_core_grpc : "com.google.cloud:google-cloud-core-grpc", // google_cloud_platform_libraries_bom sets version
google_cloud_datacatalog_v1beta1 : "com.google.cloud:google-cloud-datacatalog", // google_cloud_platform_libraries_bom sets version
diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle
index d4a143c6173..cc7c850c490 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -18,7 +18,13 @@
import groovy.json.JsonOutput
-plugins { id 'org.apache.beam.module' }
+plugins {
+ id 'org.apache.beam.module'
+ // For resolving Cloud Bigtable dependencies from custom registry that includes Change Stream API.
+ id "maven-publish"
+ id "com.google.cloud.artifactregistry.gradle-plugin" version "2.1.5"
+ // End: Temporary Cloud Bigtable dependencies.
+}
applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.io.gcp',
enableSpotbugs: false,
@@ -27,6 +33,22 @@ applyJavaNature(
],
)
+// For resolving Cloud Bigtable dependencies from custom registry that includes Change Stream API.
+repositories {
+ maven {
+ url "artifactregistry://us-central1-maven.pkg.dev/cloud-bigtable-ecosystem/bigtable-change-streams-preview"
+ }
+}
+configurations.all {
+ resolutionStrategy {
+ force group: 'com.google.cloud', name: 'google-cloud-bigtable', version: '2.11.2-change-streams-preview1-SNAPSHOT'
+ force group: 'com.google.api.grpc', name: 'proto-google-cloud-bigtable-v2', version: '2.11.2-change-streams-preview1-SNAPSHOT'
+ force group: 'com.google.api.grpc', name: 'grpc-google-cloud-bigtable-v2', version: '2.11.2-change-streams-preview1-SNAPSHOT'
+ force group: 'com.google.api.grpc', name: 'proto-google-cloud-bigtable-admin-v2', version: '2.11.2-change-streams-preview1-SNAPSHOT'
+ force group: 'com.google.api.grpc', name: 'grpc-google-cloud-bigtable-admin-v2', version: '2.11.2-change-streams-preview1-SNAPSHOT'
+ }
+}
+
description = "Apache Beam :: SDKs :: Java :: IO :: Google Cloud Platform"
ext.summary = "IO library to read and write Google Cloud Platform systems from Beam."
@@ -165,6 +187,7 @@ dependencies {
testImplementation library.java.powermock_mockito
testImplementation library.java.joda_time
testImplementation library.java.google_cloud_spanner_test
+ testImplementation library.java.google_cloud_bigtable_emulator
testRuntimeOnly library.java.slf4j_jdk14
// everit_json is needed for PubsubLite SchemaTransform tests that rely on JSON-schema translation.
permitUnusedDeclared library.java.everit_json_schema
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 98826b9cf5c..d2eff9cbbfa 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -29,6 +29,7 @@ import com.google.bigtable.v2.RowFilter;
import com.google.bigtable.v2.SampleRowKeysResponse;
import com.google.cloud.Timestamp;
import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.time.Instant;
@@ -48,7 +49,6 @@ import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
-import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMutation;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMutation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMutation.java
deleted file mode 100644
index cc4cf467d29..00000000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMutation.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.bigtable.changestreams;
-
-/**
- * This is a placeholder class that will be replaced by updated Cloud Bigtable java client. The java
- * client is work in progress and will be checked in and updated soon.
- */
-public class ChangeStreamMutation {}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java
index 0f1d8ee30cf..012173a89cd 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java
@@ -17,10 +17,11 @@
*/
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
+import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
+import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.protobuf.ByteString;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
-import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMutation;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress;
import org.apache.beam.sdk.transforms.DoFn;
@@ -94,8 +95,7 @@ public class ChangeStreamAction {
*/
public Optional<DoFn.ProcessContinuation> run(
PartitionRecord partitionRecord,
- Object record, // TODO: Update once bigtable client includes
- // https://github.com/googleapis/java-bigtable/pull/1569
+ ChangeStreamRecord record,
RestrictionTracker<StreamProgress, StreamProgress> tracker,
DoFn.OutputReceiver<KV<ByteString, ChangeStreamMutation>> receiver,
ManualWatermarkEstimator<Instant> watermarkEstimator,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
index ed7ffaa157b..d55838e5b85 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
@@ -17,10 +17,10 @@
*/
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
+import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.protobuf.ByteString;
import java.io.IOException;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
-import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMutation;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.DetectNewPartitionsDoFn;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableChangeStreamAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableChangeStreamAccessor.java
new file mode 100644
index 00000000000..68a2dfc043c
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableChangeStreamAccessor.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminClient;
+import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminSettings;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
+import com.google.cloud.bigtable.data.v2.BigtableDataClient;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.io.gcp.bigtable.BigtableConfig;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.threeten.bp.Duration;
+
+/**
+ * This is probably a temporary solution to what is a bigger migration from
+ * cloud-bigtable-client-core to java-bigtable.
+ *
+ * <p>This class creates and maintains the lifecycle java-bigtable clients to interact with Cloud
+ * Bigtable. This class creates singletons of data and admin clients for each
+ * project/instance/table/app profile. Per workers, there should only be 1 instance of the client
+ * for each table/app profile. This ensures we're not creating many/excessive connections with the
+ * backend and the jobs on the same machine shares the same sets of connections.
+ */
+public class BigtableChangeStreamAccessor implements AutoCloseable {
+ // Create one bigtable data/admin client per bigtable config (project/instance/table/app profile)
+ private static final ConcurrentHashMap<BigtableConfig, BigtableChangeStreamAccessor>
+ bigtableAccessors = new ConcurrentHashMap<>();
+
+ private static final ConcurrentHashMap<BigtableConfig, AtomicInteger> refcounts =
+ new ConcurrentHashMap<>();
+
+ private final BigtableDataClient dataClient;
+ private final BigtableTableAdminClient tableAdminClient;
+ private final BigtableInstanceAdminClient instanceAdminClient;
+ private final BigtableConfig bigtableConfig;
+
+ private BigtableChangeStreamAccessor(
+ BigtableDataClient dataClient,
+ BigtableTableAdminClient tableAdminClient,
+ BigtableInstanceAdminClient instanceAdminClient,
+ BigtableConfig bigtableConfig) {
+ this.dataClient = dataClient;
+ this.tableAdminClient = tableAdminClient;
+ this.instanceAdminClient = instanceAdminClient;
+ this.bigtableConfig = bigtableConfig;
+ }
+
+ /**
+ * Create a BigtableAccess if it doesn't exist and store it in the cache for faster access. If it
+ * does exist, just return it.
+ *
+ * @param bigtableConfig config that contains all the parameters to connect to a Cloud Bigtable
+ * instance
+ * @return data and admin clients connected to the Cloud Bigtable instance
+ * @throws IOException if the connection fails
+ */
+ public static synchronized BigtableChangeStreamAccessor getOrCreate(
+ @NonNull BigtableConfig bigtableConfig) throws IOException {
+ if (bigtableAccessors.get(bigtableConfig) == null) {
+ BigtableChangeStreamAccessor bigtableAccessor =
+ BigtableChangeStreamAccessor.createAccessor(bigtableConfig);
+ bigtableAccessors.put(bigtableConfig, bigtableAccessor);
+ refcounts.putIfAbsent(bigtableConfig, new AtomicInteger(0));
+ }
+ checkStateNotNull(refcounts.get(bigtableConfig)).incrementAndGet();
+ return checkStateNotNull(bigtableAccessors.get(bigtableConfig));
+ }
+
+ private static BigtableChangeStreamAccessor createAccessor(@NonNull BigtableConfig bigtableConfig)
+ throws IOException {
+ String projectId = checkArgumentNotNull(bigtableConfig.getProjectId()).get();
+ String instanceId = checkArgumentNotNull(bigtableConfig.getInstanceId()).get();
+ String appProfileId = checkArgumentNotNull(bigtableConfig.getAppProfileId()).get();
+ BigtableDataSettings.Builder dataSettingsBuilder = BigtableDataSettings.newBuilder();
+ BigtableTableAdminSettings.Builder tableAdminSettingsBuilder =
+ BigtableTableAdminSettings.newBuilder();
+ BigtableInstanceAdminSettings.Builder instanceAdminSettingsBuilder =
+ BigtableInstanceAdminSettings.newBuilder();
+
+ dataSettingsBuilder.setProjectId(projectId);
+ tableAdminSettingsBuilder.setProjectId(projectId);
+ instanceAdminSettingsBuilder.setProjectId(projectId);
+
+ dataSettingsBuilder.setInstanceId(instanceId);
+ tableAdminSettingsBuilder.setInstanceId(instanceId);
+
+ if (appProfileId != null) {
+ dataSettingsBuilder.setAppProfileId(appProfileId);
+ }
+
+ RetrySettings.Builder readRowRetrySettings =
+ dataSettingsBuilder.stubSettings().readRowSettings().retrySettings();
+ dataSettingsBuilder
+ .stubSettings()
+ .readRowSettings()
+ .setRetrySettings(
+ readRowRetrySettings
+ .setInitialRpcTimeout(Duration.ofSeconds(30))
+ .setTotalTimeout(Duration.ofSeconds(30))
+ .setMaxRpcTimeout(Duration.ofSeconds(30))
+ .setMaxAttempts(10)
+ .build());
+
+ RetrySettings.Builder readRowsRetrySettings =
+ dataSettingsBuilder.stubSettings().readRowsSettings().retrySettings();
+ dataSettingsBuilder
+ .stubSettings()
+ .readRowsSettings()
+ .setRetrySettings(
+ readRowsRetrySettings
+ .setInitialRpcTimeout(Duration.ofSeconds(30))
+ .setTotalTimeout(Duration.ofSeconds(30))
+ .setMaxRpcTimeout(Duration.ofSeconds(30))
+ .setMaxAttempts(10)
+ .build());
+
+ RetrySettings.Builder mutateRowRetrySettings =
+ dataSettingsBuilder.stubSettings().mutateRowSettings().retrySettings();
+ dataSettingsBuilder
+ .stubSettings()
+ .mutateRowSettings()
+ .setRetrySettings(
+ mutateRowRetrySettings
+ .setInitialRpcTimeout(Duration.ofSeconds(30))
+ .setTotalTimeout(Duration.ofSeconds(30))
+ .setMaxRpcTimeout(Duration.ofSeconds(30))
+ .setMaxAttempts(10)
+ .build());
+
+ RetrySettings.Builder checkAndMutateRowRetrySettings =
+ dataSettingsBuilder.stubSettings().checkAndMutateRowSettings().retrySettings();
+ dataSettingsBuilder
+ .stubSettings()
+ .checkAndMutateRowSettings()
+ .setRetrySettings(
+ checkAndMutateRowRetrySettings
+ .setInitialRpcTimeout(Duration.ofSeconds(30))
+ .setTotalTimeout(Duration.ofSeconds(30))
+ .setMaxRpcTimeout(Duration.ofSeconds(30))
+ .setMaxAttempts(10)
+ .build());
+
+ RetrySettings.Builder readChangeStreamRetrySettings =
+ dataSettingsBuilder.stubSettings().readChangeStreamSettings().retrySettings();
+ dataSettingsBuilder
+ .stubSettings()
+ .readChangeStreamSettings()
+ .setRetrySettings(
+ readChangeStreamRetrySettings
+ // Set timeouts to 60s - dataflow should checkpoint before then, but it is not
+ // guaranteed to happen after a specific duration. We still want a conservative
+ // timeout so that it can't hang.
+ .setInitialRpcTimeout(Duration.ofSeconds(60))
+ .setTotalTimeout(Duration.ofSeconds(60))
+ .setMaxRpcTimeout(Duration.ofSeconds(60))
+ .setMaxAttempts(3)
+ .build());
+
+ BigtableDataClient dataClient = BigtableDataClient.create(dataSettingsBuilder.build());
+ BigtableTableAdminClient tableAdminClient =
+ BigtableTableAdminClient.create(tableAdminSettingsBuilder.build());
+ BigtableInstanceAdminClient instanceAdminClient =
+ BigtableInstanceAdminClient.create(instanceAdminSettingsBuilder.build());
+ return new BigtableChangeStreamAccessor(
+ dataClient, tableAdminClient, instanceAdminClient, bigtableConfig);
+ }
+
+ public BigtableDataClient getDataClient() {
+ return dataClient;
+ }
+
+ public BigtableTableAdminClient getTableAdminClient() {
+ return tableAdminClient;
+ }
+
+ public BigtableInstanceAdminClient getInstanceAdminClient() {
+ return instanceAdminClient;
+ }
+
+ @Override
+ public void close() throws Exception {
+ int refcount = refcounts.getOrDefault(bigtableConfig, new AtomicInteger(0)).decrementAndGet();
+
+ if (refcount == 0) {
+ synchronized (bigtableAccessors) {
+ // Re-check refcount in case it has increased outside the lock.
+ if (checkStateNotNull(refcounts.get(bigtableConfig)).get() <= 0) {
+ bigtableAccessors.remove(bigtableConfig);
+ refcounts.remove(bigtableConfig);
+ dataClient.close();
+ tableAdminClient.close();
+ instanceAdminClient.close();
+ }
+ }
+ }
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
index 98c6ae2899e..147b872dba0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao;
+import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -25,9 +26,11 @@ import org.slf4j.LoggerFactory;
public class ChangeStreamDao {
private static final Logger LOG = LoggerFactory.getLogger(ChangeStreamDao.class);
+ private final BigtableDataClient dataClient;
private final String tableId;
- public ChangeStreamDao(String tableId) {
+ public ChangeStreamDao(BigtableDataClient dataClient, String tableId) {
+ this.dataClient = dataClient;
this.tableId = tableId;
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java
index 6df9fa7a0a7..2a7e0dfcd89 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java
@@ -19,6 +19,9 @@ package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminClient;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
+import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import java.io.IOException;
import java.io.Serializable;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableConfig;
@@ -81,7 +84,9 @@ public class DaoFactory implements Serializable {
checkArgumentNotNull(changeStreamConfig.getInstanceId());
String tableId = checkArgumentNotNull(changeStreamConfig.getTableId()).get();
checkArgumentNotNull(changeStreamConfig.getAppProfileId());
- changeStreamDao = new ChangeStreamDao(tableId);
+ BigtableDataClient dataClient =
+ BigtableChangeStreamAccessor.getOrCreate(changeStreamConfig).getDataClient();
+ changeStreamDao = new ChangeStreamDao(dataClient, tableId);
}
return changeStreamDao;
}
@@ -92,8 +97,11 @@ public class DaoFactory implements Serializable {
checkArgumentNotNull(metadataTableConfig.getInstanceId());
checkArgumentNotNull(metadataTableConfig.getTableId());
checkArgumentNotNull(metadataTableConfig.getAppProfileId());
+ BigtableDataClient dataClient =
+ BigtableChangeStreamAccessor.getOrCreate(metadataTableConfig).getDataClient();
metadataTableDao =
new MetadataTableDao(
+ dataClient,
getMetadataTableAdminDao().getTableId(),
getMetadataTableAdminDao().getChangeStreamNamePrefix());
}
@@ -106,7 +114,13 @@ public class DaoFactory implements Serializable {
checkArgumentNotNull(metadataTableConfig.getInstanceId());
String tableId = checkArgumentNotNull(metadataTableConfig.getTableId()).get();
checkArgumentNotNull(metadataTableConfig.getAppProfileId());
- metadataTableAdminDao = new MetadataTableAdminDao(changeStreamName, tableId);
+ BigtableTableAdminClient tableAdminClient =
+ BigtableChangeStreamAccessor.getOrCreate(metadataTableConfig).getTableAdminClient();
+ BigtableInstanceAdminClient instanceAdminClient =
+ BigtableChangeStreamAccessor.getOrCreate(metadataTableConfig).getInstanceAdminClient();
+ metadataTableAdminDao =
+ new MetadataTableAdminDao(
+ tableAdminClient, instanceAdminClient, changeStreamName, tableId);
}
return metadataTableAdminDao;
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableAdminDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableAdminDao.java
index ba3bceb31dd..f98be03baf3 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableAdminDao.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableAdminDao.java
@@ -17,7 +17,17 @@
*/
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao;
+import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminClient;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
+import com.google.cloud.bigtable.admin.v2.models.AppProfile;
+import com.google.cloud.bigtable.admin.v2.models.AppProfile.SingleClusterRoutingPolicy;
+import com.google.cloud.bigtable.admin.v2.models.ColumnFamily;
+import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
+import com.google.cloud.bigtable.admin.v2.models.GCRules;
+import com.google.cloud.bigtable.admin.v2.models.ModifyColumnFamiliesRequest;
+import com.google.cloud.bigtable.admin.v2.models.Table;
import com.google.protobuf.ByteString;
+import java.util.List;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
/**
@@ -38,6 +48,7 @@ public class MetadataTableAdminDao {
public static final String CF_CONTINUATION_TOKEN = "continuation_token";
public static final String CF_LOCK = "lock";
public static final String CF_MISSING_PARTITIONS = "missing_partitions";
+ public static final String CF_VERSION = "version";
public static final String QUALIFIER_DEFAULT = "latest";
public static final ImmutableList<String> COLUMN_FAMILIES =
ImmutableList.of(
@@ -47,17 +58,31 @@ public class MetadataTableAdminDao {
CF_WATERMARK,
CF_CONTINUATION_TOKEN,
CF_LOCK,
- CF_MISSING_PARTITIONS);
+ CF_MISSING_PARTITIONS,
+ CF_VERSION);
public static final ByteString NEW_PARTITION_PREFIX = ByteString.copyFromUtf8("NewPartition#");
public static final ByteString STREAM_PARTITION_PREFIX =
ByteString.copyFromUtf8("StreamPartition#");
public static final ByteString DETECT_NEW_PARTITION_SUFFIX =
ByteString.copyFromUtf8("DetectNewPartition");
+ // If change metadata table schema or how the table is used, this version should be bumped up.
+ // Different versions are incompatible and needs to fixed before the pipeline can continue.
+ // Otherwise, the pipeline may fail or even cause corruption in the metadata table.
+ public static final int CURRENT_METADATA_TABLE_VERSION = 1;
+
+ private final BigtableTableAdminClient tableAdminClient;
+ private final BigtableInstanceAdminClient instanceAdminClient;
private final String tableId;
private final ByteString changeStreamNamePrefix;
- public MetadataTableAdminDao(String changeStreamName, String tableId) {
+ public MetadataTableAdminDao(
+ BigtableTableAdminClient tableAdminClient,
+ BigtableInstanceAdminClient instanceAdminClient,
+ String changeStreamName,
+ String tableId) {
+ this.tableAdminClient = tableAdminClient;
+ this.instanceAdminClient = instanceAdminClient;
this.tableId = tableId;
this.changeStreamNamePrefix = ByteString.copyFromUtf8(changeStreamName + "#");
}
@@ -79,4 +104,69 @@ public class MetadataTableAdminDao {
public String getTableId() {
return tableId;
}
+
+ /**
+ * Verify the app profile is for single cluster routing with allow single-row transactions
+ * enabled. For metadata data operations, the app profile needs to be single cluster routing
+ * because it requires read-after-write consistency. Also, the operations depend on single row
+ * transactions operations like CheckAndMutateRow.
+ *
+ * @return true if the app profile is single-cluster and allows single-row transactions, otherwise
+ * false
+ */
+ public boolean isAppProfileSingleClusterAndTransactional(String appProfileId) {
+ AppProfile appProfile =
+ instanceAdminClient.getAppProfile(tableAdminClient.getInstanceId(), appProfileId);
+ if (appProfile.getPolicy() instanceof SingleClusterRoutingPolicy) {
+ SingleClusterRoutingPolicy routingPolicy =
+ (SingleClusterRoutingPolicy) appProfile.getPolicy();
+ return routingPolicy.getAllowTransactionalWrites();
+ }
+ return false;
+ }
+
+ /**
+ * Create the metadata table if it does not exist yet. If the table does exist, verify all the
+ * column families exists, if not add those column families. This table only need to be created
+ * once per instance. All change streams jobs will use this table. This table is created in the
+ * same instance as the table being streamed. While we don't restrict access to the table,
+ * manually editing the table can lead to inconsistent beam jobs.
+ *
+ * @return true if the table was successfully created, otherwise false.
+ */
+ public boolean createMetadataTable() {
+ GCRules.GCRule gcRules = GCRules.GCRULES.maxVersions(1);
+
+ if (tableAdminClient.exists(tableId)) {
+ Table table = tableAdminClient.getTable(tableId);
+ List<ColumnFamily> currentCFs = table.getColumnFamilies();
+ ModifyColumnFamiliesRequest request = ModifyColumnFamiliesRequest.of(tableId);
+ boolean needsNewColumnFamily = false;
+ for (String targetCF : COLUMN_FAMILIES) {
+ boolean exists = false;
+ for (ColumnFamily currentCF : currentCFs) {
+ if (targetCF.equals(currentCF.getId())) {
+ exists = true;
+ break;
+ }
+ }
+ if (!exists) {
+ needsNewColumnFamily = true;
+ request.addFamily(targetCF, gcRules);
+ }
+ }
+ if (needsNewColumnFamily) {
+ tableAdminClient.modifyFamilies(request);
+ }
+ return false;
+ }
+
+ CreateTableRequest createTableRequest = CreateTableRequest.of(tableId);
+ for (String cf : COLUMN_FAMILIES) {
+ createTableRequest.addFamily(cf, gcRules);
+ }
+
+ tableAdminClient.createTable(createTableRequest);
+ return true;
+ }
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java
index b9f65f7a4a2..c4889018c89 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java
@@ -21,6 +21,8 @@ import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTabl
import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao.NEW_PARTITION_PREFIX;
import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao.STREAM_PARTITION_PREFIX;
+import com.google.cloud.bigtable.data.v2.BigtableDataClient;
+import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,10 +37,13 @@ import org.slf4j.LoggerFactory;
public class MetadataTableDao {
private static final Logger LOG = LoggerFactory.getLogger(MetadataTableDao.class);
+ private final BigtableDataClient dataClient;
private final String tableId;
private final ByteString changeStreamNamePrefix;
- public MetadataTableDao(String tableId, ByteString changeStreamNamePrefix) {
+ public MetadataTableDao(
+ BigtableDataClient dataClient, String tableId, ByteString changeStreamNamePrefix) {
+ this.dataClient = dataClient;
this.tableId = tableId;
this.changeStreamNamePrefix = changeStreamNamePrefix;
}
@@ -74,4 +79,18 @@ public class MetadataTableDao {
private ByteString getFullDetectNewPartition() {
return changeStreamNamePrefix.concat(DETECT_NEW_PARTITION_SUFFIX);
}
+
+ /**
+ * Set the version number for DetectNewPartition. This value can be checked later to verify that
+ * the existing metadata table is compatible with current beam connector code.
+ */
+ public void writeDetectNewPartitionVersion() {
+ RowMutation rowMutation =
+ RowMutation.create(tableId, getFullDetectNewPartition())
+ .setCell(
+ MetadataTableAdminDao.CF_VERSION,
+ MetadataTableAdminDao.QUALIFIER_DEFAULT,
+ MetadataTableAdminDao.CURRENT_METADATA_TABLE_VERSION);
+ dataClient.mutateRow(rowMutation);
+ }
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFn.java
index c80fb4a4e10..ba56726768e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFn.java
@@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory;
* A DoFn responsible to initialize the metadata table and prepare it for managing the state of the
* pipeline.
*/
-@SuppressWarnings("UnusedVariable")
public class InitializeDoFn extends DoFn<byte[], com.google.cloud.Timestamp>
implements Serializable {
private static final long serialVersionUID = 1868189906451252363L;
@@ -53,6 +52,25 @@ public class InitializeDoFn extends DoFn<byte[], com.google.cloud.Timestamp>
LOG.info(daoFactory.getStreamTableDebugString());
LOG.info(daoFactory.getMetadataTableDebugString());
LOG.info("ChangeStreamName: " + daoFactory.getChangeStreamName());
+ if (!daoFactory
+ .getMetadataTableAdminDao()
+ .isAppProfileSingleClusterAndTransactional(this.metadataTableAppProfileId)) {
+ LOG.error(
+ "App profile id '"
+ + metadataTableAppProfileId
+ + "' provided to access metadata table needs to use single-cluster routing policy"
+ + " and allow single-row transactions.");
+ // Terminate this pipeline now.
+ return;
+ }
+ if (daoFactory.getMetadataTableAdminDao().createMetadataTable()) {
+ LOG.info("Created metadata table: " + daoFactory.getMetadataTableAdminDao().getTableId());
+ } else {
+ LOG.info(
+ "Reusing existing metadata table: " + daoFactory.getMetadataTableAdminDao().getTableId());
+ }
+
+ daoFactory.getMetadataTableDao().writeDetectNewPartitionVersion();
receiver.output(startTime);
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
index 62ffd39cc1e..a7871dd5414 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
@@ -17,10 +17,10 @@
*/
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn;
+import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.protobuf.ByteString;
import java.io.IOException;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
-import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMutation;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ChangeStreamAction;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/encoder/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/encoder/package-info.java
deleted file mode 100644
index d6388c1a56d..00000000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/encoder/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Encoders for writing and reading from Metadata Table for Google Cloud Bigtable Change Streams.
- */
-@Experimental
-package org.apache.beam.sdk.io.gcp.bigtable.changestreams.encoder;
-
-import org.apache.beam.sdk.annotations.Experimental;
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
index a0ade3fc229..df8dc0fd1cf 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
@@ -67,10 +67,16 @@ public class GcpApiSurfaceTest {
classesInPackage("com.google.api.gax.retrying"),
classesInPackage("com.google.api.gax.longrunning"),
classesInPackage("com.google.api.gax.rpc"),
+ classesInPackage("com.google.api.gax.grpc"),
+ classesInPackage("com.google.api.gax.tracing"),
+ classesInPackage("com.google.api.gax.core"),
+ classesInPackage("com.google.api.gax.batching"),
+ classesInPackage("com.google.api.gax.paging"),
classesInPackage("com.google.api.services.bigquery.model"),
classesInPackage("com.google.api.services.healthcare"),
classesInPackage("com.google.auth"),
classesInPackage("com.google.bigtable.v2"),
+ classesInPackage("com.google.bigtable.admin.v2"),
classesInPackage("com.google.cloud"),
classesInPackage("com.google.common.collect"),
classesInPackage("com.google.cloud.bigquery.storage.v1"),
@@ -80,8 +86,8 @@ public class GcpApiSurfaceTest {
classesInPackage("com.google.pubsub.v1"),
classesInPackage("com.google.cloud.pubsublite"),
Matchers.equalTo(com.google.api.gax.rpc.ApiException.class),
- Matchers.equalTo(com.google.api.gax.paging.Page.class),
Matchers.<Class<?>>equalTo(com.google.api.gax.rpc.StatusCode.class),
+ Matchers.<Class<?>>equalTo(com.google.api.resourcenames.ResourceName.class),
Matchers.<Class<?>>equalTo(com.google.common.base.Function.class),
Matchers.<Class<?>>equalTo(com.google.common.base.Optional.class),
Matchers.<Class<?>>equalTo(com.google.common.base.Supplier.class),
@@ -124,9 +130,7 @@ public class GcpApiSurfaceTest {
classesInPackage("org.apache.commons.logging"),
classesInPackage("org.codehaus.jackson"),
classesInPackage("org.joda.time"),
- Matchers.<Class<?>>equalTo(org.threeten.bp.Duration.class),
- Matchers.<Class<?>>equalTo(org.threeten.bp.format.ResolverStyle.class),
- classesInPackage("org.threeten.bp.temporal"),
+ classesInPackage("org.threeten.bp"),
classesInPackage("com.google.gson"));
assertThat(apiSurface, containsOnlyClassesMatching(allowedClasses));
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFnTest.java
new file mode 100644
index 00000000000..73587839f17
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFnTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
+import com.google.cloud.bigtable.data.v2.BigtableDataClient;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.models.Row;
+import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule;
+import java.io.IOException;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class InitializeDoFnTest {
+ @ClassRule
+ public static final BigtableEmulatorRule BIGTABLE_EMULATOR_RULE = BigtableEmulatorRule.create();
+
+ @Mock private DaoFactory daoFactory;
+ @Mock private transient MetadataTableAdminDao metadataTableAdminDao;
+ private transient MetadataTableDao metadataTableDao;
+ @Mock private DoFn.OutputReceiver<Timestamp> outputReceiver;
+ private final String tableId = "table";
+
+ private static BigtableDataClient dataClient;
+ private static BigtableTableAdminClient adminClient;
+
+ @BeforeClass
+ public static void beforeClass() throws IOException {
+ BigtableTableAdminSettings adminSettings =
+ BigtableTableAdminSettings.newBuilderForEmulator(BIGTABLE_EMULATOR_RULE.getPort())
+ .setProjectId("fake-project")
+ .setInstanceId("fake-instance")
+ .build();
+ adminClient = BigtableTableAdminClient.create(adminSettings);
+ BigtableDataSettings dataSettingsBuilder =
+ BigtableDataSettings.newBuilderForEmulator(BIGTABLE_EMULATOR_RULE.getPort())
+ .setProjectId("fake-project")
+ .setInstanceId("fake-instance")
+ .build();
+ dataClient = BigtableDataClient.create(dataSettingsBuilder);
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ String changeStreamName = "changeStreamName";
+ metadataTableAdminDao =
+ spy(new MetadataTableAdminDao(adminClient, null, changeStreamName, tableId));
+ doReturn(true).when(metadataTableAdminDao).isAppProfileSingleClusterAndTransactional(any());
+ when(daoFactory.getMetadataTableAdminDao()).thenReturn(metadataTableAdminDao);
+ metadataTableDao =
+ new MetadataTableDao(
+ dataClient, tableId, metadataTableAdminDao.getChangeStreamNamePrefix());
+ when(daoFactory.getMetadataTableDao()).thenReturn(metadataTableDao);
+ when(daoFactory.getChangeStreamName()).thenReturn(changeStreamName);
+ }
+
+ @Test
+ public void testInitializeDefault() throws IOException {
+ Timestamp startTime = Timestamp.now();
+ InitializeDoFn initializeDoFn = new InitializeDoFn(daoFactory, "app-profile", startTime);
+ initializeDoFn.processElement(outputReceiver);
+ verify(outputReceiver, times(1)).output(startTime);
+ assertTrue(adminClient.exists(tableId));
+ Row row =
+ dataClient.readRow(
+ tableId,
+ metadataTableAdminDao
+ .getChangeStreamNamePrefix()
+ .concat(MetadataTableAdminDao.DETECT_NEW_PARTITION_SUFFIX));
+ assertNotNull(row);
+ assertEquals(
+ 1,
+ row.getCells(MetadataTableAdminDao.CF_VERSION, MetadataTableAdminDao.QUALIFIER_DEFAULT)
+ .size());
+ assertEquals(
+ MetadataTableAdminDao.CURRENT_METADATA_TABLE_VERSION,
+ (int)
+ Longs.fromByteArray(
+ row.getCells(
+ MetadataTableAdminDao.CF_VERSION, MetadataTableAdminDao.QUALIFIER_DEFAULT)
+ .get(0)
+ .getValue()
+ .toByteArray()));
+ }
+}