You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2023/02/11 02:18:48 UTC

[beam] branch bigtable-cdc-feature-branch updated: Create metadata table in initialize stage if table doesn't exist. (#25364)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch bigtable-cdc-feature-branch
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/bigtable-cdc-feature-branch by this push:
     new feef2e40881 Create metadata table in initialize stage if table doesn't exist. (#25364)
feef2e40881 is described below

commit feef2e40881e611c7eb794e5d43d8086e0f5e482
Author: Tony Tang <nf...@gmail.com>
AuthorDate: Fri Feb 10 21:18:40 2023 -0500

    Create metadata table in initialize stage if table doesn't exist. (#25364)
    
    * Create metadata table during initialize stage of Bigtable Change Stream connector
    
    * Fix GcpApiSurfaceTest that broke because of the addition of Cloud Bigtable's Admin API
    
    * Fix import
    
    * Fix dependencies error
    
    * Change Cloud Bigtable dependencies to depend on custom public artifact registry instead of local jars
    
    * Remove unncessary excludes
    
    * Replace placeholder class with real implementation
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   2 +-
 sdks/java/io/google-cloud-platform/build.gradle    |  25 ++-
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java       |   2 +-
 .../changestreams/ChangeStreamMutation.java        |  24 ---
 .../changestreams/action/ChangeStreamAction.java   |   6 +-
 .../action/ReadChangeStreamPartitionAction.java    |   2 +-
 .../dao/BigtableChangeStreamAccessor.java          | 220 +++++++++++++++++++++
 .../changestreams/dao/ChangeStreamDao.java         |   5 +-
 .../gcp/bigtable/changestreams/dao/DaoFactory.java |  18 +-
 .../changestreams/dao/MetadataTableAdminDao.java   |  94 ++++++++-
 .../changestreams/dao/MetadataTableDao.java        |  21 +-
 .../changestreams/dofn/InitializeDoFn.java         |  20 +-
 .../dofn/ReadChangeStreamPartitionDoFn.java        |   2 +-
 .../changestreams/encoder/package-info.java        |  24 ---
 .../apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java  |  12 +-
 .../changestreams/dofn/InitializeDoFnTest.java     | 123 ++++++++++++
 16 files changed, 533 insertions(+), 67 deletions(-)

diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 414b3ccdc55..db956e46a21 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -600,7 +600,7 @@ class BeamModulePlugin implements Plugin<Project> {
         google_cloud_bigquery_storage               : "com.google.cloud:google-cloud-bigquerystorage", // google_cloud_platform_libraries_bom sets version
         google_cloud_bigtable                       : "com.google.cloud:google-cloud-bigtable", // google_cloud_platform_libraries_bom sets version
         google_cloud_bigtable_client_core           : "com.google.cloud.bigtable:bigtable-client-core:1.26.3",
-        google_cloud_bigtable_emulator              : "com.google.cloud:google-cloud-bigtable-emulator:0.137.1",
+        google_cloud_bigtable_emulator              : "com.google.cloud:google-cloud-bigtable-emulator:0.147.3",
         google_cloud_core                           : "com.google.cloud:google-cloud-core", // google_cloud_platform_libraries_bom sets version
         google_cloud_core_grpc                      : "com.google.cloud:google-cloud-core-grpc", // google_cloud_platform_libraries_bom sets version
         google_cloud_datacatalog_v1beta1            : "com.google.cloud:google-cloud-datacatalog", // google_cloud_platform_libraries_bom sets version
diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle
index d4a143c6173..cc7c850c490 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -18,7 +18,13 @@
 
 import groovy.json.JsonOutput
 
-plugins { id 'org.apache.beam.module' }
+plugins {
+  id 'org.apache.beam.module'
+  // For resolving Cloud Bigtable dependencies from custom registry that includes Change Stream API.
+  id "maven-publish"
+  id "com.google.cloud.artifactregistry.gradle-plugin" version "2.1.5"
+  // End: Temporary Cloud Bigtable dependencies.
+}
 applyJavaNature(
   automaticModuleName: 'org.apache.beam.sdk.io.gcp',
   enableSpotbugs: false,
@@ -27,6 +33,22 @@ applyJavaNature(
   ],
 )
 
+// For resolving Cloud Bigtable dependencies from custom registry that includes Change Stream API.
+repositories {
+  maven {
+    url "artifactregistry://us-central1-maven.pkg.dev/cloud-bigtable-ecosystem/bigtable-change-streams-preview"
+  }
+}
+configurations.all {
+  resolutionStrategy {
+    force group: 'com.google.cloud', name: 'google-cloud-bigtable', version: '2.11.2-change-streams-preview1-SNAPSHOT'
+    force group: 'com.google.api.grpc', name: 'proto-google-cloud-bigtable-v2', version: '2.11.2-change-streams-preview1-SNAPSHOT'
+    force group: 'com.google.api.grpc', name: 'grpc-google-cloud-bigtable-v2', version: '2.11.2-change-streams-preview1-SNAPSHOT'
+    force group: 'com.google.api.grpc', name: 'proto-google-cloud-bigtable-admin-v2', version: '2.11.2-change-streams-preview1-SNAPSHOT'
+    force group: 'com.google.api.grpc', name: 'grpc-google-cloud-bigtable-admin-v2', version: '2.11.2-change-streams-preview1-SNAPSHOT'
+  }
+}
+
 description = "Apache Beam :: SDKs :: Java :: IO :: Google Cloud Platform"
 ext.summary = "IO library to read and write Google Cloud Platform systems from Beam."
 
@@ -165,6 +187,7 @@ dependencies {
   testImplementation library.java.powermock_mockito
   testImplementation library.java.joda_time
   testImplementation library.java.google_cloud_spanner_test
+  testImplementation library.java.google_cloud_bigtable_emulator
   testRuntimeOnly library.java.slf4j_jdk14
   // everit_json is needed for PubsubLite SchemaTransform tests that rely on JSON-schema translation.
   permitUnusedDeclared library.java.everit_json_schema
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 98826b9cf5c..d2eff9cbbfa 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -29,6 +29,7 @@ import com.google.bigtable.v2.RowFilter;
 import com.google.bigtable.v2.SampleRowKeysResponse;
 import com.google.cloud.Timestamp;
 import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.time.Instant;
@@ -48,7 +49,6 @@ import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
-import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMutation;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMutation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMutation.java
deleted file mode 100644
index cc4cf467d29..00000000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMutation.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.bigtable.changestreams;
-
-/**
- * This is a placeholder class that will be replaced by updated Cloud Bigtable java client. The java
- * client is work in progress and will be checked in and updated soon.
- */
-public class ChangeStreamMutation {}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java
index 0f1d8ee30cf..012173a89cd 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java
@@ -17,10 +17,11 @@
  */
 package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
 
+import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
+import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
 import com.google.protobuf.ByteString;
 import java.util.Optional;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
-import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMutation;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -94,8 +95,7 @@ public class ChangeStreamAction {
    */
   public Optional<DoFn.ProcessContinuation> run(
       PartitionRecord partitionRecord,
-      Object record, // TODO: Update once bigtable client includes
-      // https://github.com/googleapis/java-bigtable/pull/1569
+      ChangeStreamRecord record,
       RestrictionTracker<StreamProgress, StreamProgress> tracker,
       DoFn.OutputReceiver<KV<ByteString, ChangeStreamMutation>> receiver,
       ManualWatermarkEstimator<Instant> watermarkEstimator,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
index ed7ffaa157b..d55838e5b85 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
@@ -17,10 +17,10 @@
  */
 package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
 
+import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
-import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMutation;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.DetectNewPartitionsDoFn;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableChangeStreamAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableChangeStreamAccessor.java
new file mode 100644
index 00000000000..68a2dfc043c
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableChangeStreamAccessor.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminClient;
+import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminSettings;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
+import com.google.cloud.bigtable.data.v2.BigtableDataClient;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.io.gcp.bigtable.BigtableConfig;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.threeten.bp.Duration;
+
+/**
+ * This is probably a temporary solution to what is a bigger migration from
+ * cloud-bigtable-client-core to java-bigtable.
+ *
+ * <p>This class creates and maintains the lifecycle java-bigtable clients to interact with Cloud
+ * Bigtable. This class creates singletons of data and admin clients for each
+ * project/instance/table/app profile. Per workers, there should only be 1 instance of the client
+ * for each table/app profile. This ensures we're not creating many/excessive connections with the
+ * backend and the jobs on the same machine shares the same sets of connections.
+ */
+public class BigtableChangeStreamAccessor implements AutoCloseable {
+  // Create one bigtable data/admin client per bigtable config (project/instance/table/app profile)
+  private static final ConcurrentHashMap<BigtableConfig, BigtableChangeStreamAccessor>
+      bigtableAccessors = new ConcurrentHashMap<>();
+
+  private static final ConcurrentHashMap<BigtableConfig, AtomicInteger> refcounts =
+      new ConcurrentHashMap<>();
+
+  private final BigtableDataClient dataClient;
+  private final BigtableTableAdminClient tableAdminClient;
+  private final BigtableInstanceAdminClient instanceAdminClient;
+  private final BigtableConfig bigtableConfig;
+
+  private BigtableChangeStreamAccessor(
+      BigtableDataClient dataClient,
+      BigtableTableAdminClient tableAdminClient,
+      BigtableInstanceAdminClient instanceAdminClient,
+      BigtableConfig bigtableConfig) {
+    this.dataClient = dataClient;
+    this.tableAdminClient = tableAdminClient;
+    this.instanceAdminClient = instanceAdminClient;
+    this.bigtableConfig = bigtableConfig;
+  }
+
+  /**
+   * Create a BigtableAccess if it doesn't exist and store it in the cache for faster access. If it
+   * does exist, just return it.
+   *
+   * @param bigtableConfig config that contains all the parameters to connect to a Cloud Bigtable
+   *     instance
+   * @return data and admin clients connected to the Cloud Bigtable instance
+   * @throws IOException if the connection fails
+   */
+  public static synchronized BigtableChangeStreamAccessor getOrCreate(
+      @NonNull BigtableConfig bigtableConfig) throws IOException {
+    if (bigtableAccessors.get(bigtableConfig) == null) {
+      BigtableChangeStreamAccessor bigtableAccessor =
+          BigtableChangeStreamAccessor.createAccessor(bigtableConfig);
+      bigtableAccessors.put(bigtableConfig, bigtableAccessor);
+      refcounts.putIfAbsent(bigtableConfig, new AtomicInteger(0));
+    }
+    checkStateNotNull(refcounts.get(bigtableConfig)).incrementAndGet();
+    return checkStateNotNull(bigtableAccessors.get(bigtableConfig));
+  }
+
+  private static BigtableChangeStreamAccessor createAccessor(@NonNull BigtableConfig bigtableConfig)
+      throws IOException {
+    String projectId = checkArgumentNotNull(bigtableConfig.getProjectId()).get();
+    String instanceId = checkArgumentNotNull(bigtableConfig.getInstanceId()).get();
+    String appProfileId = checkArgumentNotNull(bigtableConfig.getAppProfileId()).get();
+    BigtableDataSettings.Builder dataSettingsBuilder = BigtableDataSettings.newBuilder();
+    BigtableTableAdminSettings.Builder tableAdminSettingsBuilder =
+        BigtableTableAdminSettings.newBuilder();
+    BigtableInstanceAdminSettings.Builder instanceAdminSettingsBuilder =
+        BigtableInstanceAdminSettings.newBuilder();
+
+    dataSettingsBuilder.setProjectId(projectId);
+    tableAdminSettingsBuilder.setProjectId(projectId);
+    instanceAdminSettingsBuilder.setProjectId(projectId);
+
+    dataSettingsBuilder.setInstanceId(instanceId);
+    tableAdminSettingsBuilder.setInstanceId(instanceId);
+
+    if (appProfileId != null) {
+      dataSettingsBuilder.setAppProfileId(appProfileId);
+    }
+
+    RetrySettings.Builder readRowRetrySettings =
+        dataSettingsBuilder.stubSettings().readRowSettings().retrySettings();
+    dataSettingsBuilder
+        .stubSettings()
+        .readRowSettings()
+        .setRetrySettings(
+            readRowRetrySettings
+                .setInitialRpcTimeout(Duration.ofSeconds(30))
+                .setTotalTimeout(Duration.ofSeconds(30))
+                .setMaxRpcTimeout(Duration.ofSeconds(30))
+                .setMaxAttempts(10)
+                .build());
+
+    RetrySettings.Builder readRowsRetrySettings =
+        dataSettingsBuilder.stubSettings().readRowsSettings().retrySettings();
+    dataSettingsBuilder
+        .stubSettings()
+        .readRowsSettings()
+        .setRetrySettings(
+            readRowsRetrySettings
+                .setInitialRpcTimeout(Duration.ofSeconds(30))
+                .setTotalTimeout(Duration.ofSeconds(30))
+                .setMaxRpcTimeout(Duration.ofSeconds(30))
+                .setMaxAttempts(10)
+                .build());
+
+    RetrySettings.Builder mutateRowRetrySettings =
+        dataSettingsBuilder.stubSettings().mutateRowSettings().retrySettings();
+    dataSettingsBuilder
+        .stubSettings()
+        .mutateRowSettings()
+        .setRetrySettings(
+            mutateRowRetrySettings
+                .setInitialRpcTimeout(Duration.ofSeconds(30))
+                .setTotalTimeout(Duration.ofSeconds(30))
+                .setMaxRpcTimeout(Duration.ofSeconds(30))
+                .setMaxAttempts(10)
+                .build());
+
+    RetrySettings.Builder checkAndMutateRowRetrySettings =
+        dataSettingsBuilder.stubSettings().checkAndMutateRowSettings().retrySettings();
+    dataSettingsBuilder
+        .stubSettings()
+        .checkAndMutateRowSettings()
+        .setRetrySettings(
+            checkAndMutateRowRetrySettings
+                .setInitialRpcTimeout(Duration.ofSeconds(30))
+                .setTotalTimeout(Duration.ofSeconds(30))
+                .setMaxRpcTimeout(Duration.ofSeconds(30))
+                .setMaxAttempts(10)
+                .build());
+
+    RetrySettings.Builder readChangeStreamRetrySettings =
+        dataSettingsBuilder.stubSettings().readChangeStreamSettings().retrySettings();
+    dataSettingsBuilder
+        .stubSettings()
+        .readChangeStreamSettings()
+        .setRetrySettings(
+            readChangeStreamRetrySettings
+                // Set timeouts to 60s - dataflow should checkpoint before then, but it is not
+                // guaranteed to happen after a specific duration. We still want a conservative
+                // timeout so that it can't hang.
+                .setInitialRpcTimeout(Duration.ofSeconds(60))
+                .setTotalTimeout(Duration.ofSeconds(60))
+                .setMaxRpcTimeout(Duration.ofSeconds(60))
+                .setMaxAttempts(3)
+                .build());
+
+    BigtableDataClient dataClient = BigtableDataClient.create(dataSettingsBuilder.build());
+    BigtableTableAdminClient tableAdminClient =
+        BigtableTableAdminClient.create(tableAdminSettingsBuilder.build());
+    BigtableInstanceAdminClient instanceAdminClient =
+        BigtableInstanceAdminClient.create(instanceAdminSettingsBuilder.build());
+    return new BigtableChangeStreamAccessor(
+        dataClient, tableAdminClient, instanceAdminClient, bigtableConfig);
+  }
+
+  public BigtableDataClient getDataClient() {
+    return dataClient;
+  }
+
+  public BigtableTableAdminClient getTableAdminClient() {
+    return tableAdminClient;
+  }
+
+  public BigtableInstanceAdminClient getInstanceAdminClient() {
+    return instanceAdminClient;
+  }
+
+  @Override
+  public void close() throws Exception {
+    int refcount = refcounts.getOrDefault(bigtableConfig, new AtomicInteger(0)).decrementAndGet();
+
+    if (refcount == 0) {
+      synchronized (bigtableAccessors) {
+        // Re-check refcount in case it has increased outside the lock.
+        if (checkStateNotNull(refcounts.get(bigtableConfig)).get() <= 0) {
+          bigtableAccessors.remove(bigtableConfig);
+          refcounts.remove(bigtableConfig);
+          dataClient.close();
+          tableAdminClient.close();
+          instanceAdminClient.close();
+        }
+      }
+    }
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
index 98c6ae2899e..147b872dba0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao;
 
+import com.google.cloud.bigtable.data.v2.BigtableDataClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -25,9 +26,11 @@ import org.slf4j.LoggerFactory;
 public class ChangeStreamDao {
   private static final Logger LOG = LoggerFactory.getLogger(ChangeStreamDao.class);
 
+  private final BigtableDataClient dataClient;
   private final String tableId;
 
-  public ChangeStreamDao(String tableId) {
+  public ChangeStreamDao(BigtableDataClient dataClient, String tableId) {
+    this.dataClient = dataClient;
     this.tableId = tableId;
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java
index 6df9fa7a0a7..2a7e0dfcd89 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java
@@ -19,6 +19,9 @@ package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao;
 
 import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 
+import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminClient;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
+import com.google.cloud.bigtable.data.v2.BigtableDataClient;
 import java.io.IOException;
 import java.io.Serializable;
 import org.apache.beam.sdk.io.gcp.bigtable.BigtableConfig;
@@ -81,7 +84,9 @@ public class DaoFactory implements Serializable {
       checkArgumentNotNull(changeStreamConfig.getInstanceId());
       String tableId = checkArgumentNotNull(changeStreamConfig.getTableId()).get();
       checkArgumentNotNull(changeStreamConfig.getAppProfileId());
-      changeStreamDao = new ChangeStreamDao(tableId);
+      BigtableDataClient dataClient =
+          BigtableChangeStreamAccessor.getOrCreate(changeStreamConfig).getDataClient();
+      changeStreamDao = new ChangeStreamDao(dataClient, tableId);
     }
     return changeStreamDao;
   }
@@ -92,8 +97,11 @@ public class DaoFactory implements Serializable {
       checkArgumentNotNull(metadataTableConfig.getInstanceId());
       checkArgumentNotNull(metadataTableConfig.getTableId());
       checkArgumentNotNull(metadataTableConfig.getAppProfileId());
+      BigtableDataClient dataClient =
+          BigtableChangeStreamAccessor.getOrCreate(metadataTableConfig).getDataClient();
       metadataTableDao =
           new MetadataTableDao(
+              dataClient,
               getMetadataTableAdminDao().getTableId(),
               getMetadataTableAdminDao().getChangeStreamNamePrefix());
     }
@@ -106,7 +114,13 @@ public class DaoFactory implements Serializable {
       checkArgumentNotNull(metadataTableConfig.getInstanceId());
       String tableId = checkArgumentNotNull(metadataTableConfig.getTableId()).get();
       checkArgumentNotNull(metadataTableConfig.getAppProfileId());
-      metadataTableAdminDao = new MetadataTableAdminDao(changeStreamName, tableId);
+      BigtableTableAdminClient tableAdminClient =
+          BigtableChangeStreamAccessor.getOrCreate(metadataTableConfig).getTableAdminClient();
+      BigtableInstanceAdminClient instanceAdminClient =
+          BigtableChangeStreamAccessor.getOrCreate(metadataTableConfig).getInstanceAdminClient();
+      metadataTableAdminDao =
+          new MetadataTableAdminDao(
+              tableAdminClient, instanceAdminClient, changeStreamName, tableId);
     }
     return metadataTableAdminDao;
   }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableAdminDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableAdminDao.java
index ba3bceb31dd..f98be03baf3 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableAdminDao.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableAdminDao.java
@@ -17,7 +17,17 @@
  */
 package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao;
 
+import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminClient;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
+import com.google.cloud.bigtable.admin.v2.models.AppProfile;
+import com.google.cloud.bigtable.admin.v2.models.AppProfile.SingleClusterRoutingPolicy;
+import com.google.cloud.bigtable.admin.v2.models.ColumnFamily;
+import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
+import com.google.cloud.bigtable.admin.v2.models.GCRules;
+import com.google.cloud.bigtable.admin.v2.models.ModifyColumnFamiliesRequest;
+import com.google.cloud.bigtable.admin.v2.models.Table;
 import com.google.protobuf.ByteString;
+import java.util.List;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 
 /**
@@ -38,6 +48,7 @@ public class MetadataTableAdminDao {
   public static final String CF_CONTINUATION_TOKEN = "continuation_token";
   public static final String CF_LOCK = "lock";
   public static final String CF_MISSING_PARTITIONS = "missing_partitions";
+  public static final String CF_VERSION = "version";
   public static final String QUALIFIER_DEFAULT = "latest";
   public static final ImmutableList<String> COLUMN_FAMILIES =
       ImmutableList.of(
@@ -47,17 +58,31 @@ public class MetadataTableAdminDao {
           CF_WATERMARK,
           CF_CONTINUATION_TOKEN,
           CF_LOCK,
-          CF_MISSING_PARTITIONS);
+          CF_MISSING_PARTITIONS,
+          CF_VERSION);
   public static final ByteString NEW_PARTITION_PREFIX = ByteString.copyFromUtf8("NewPartition#");
   public static final ByteString STREAM_PARTITION_PREFIX =
       ByteString.copyFromUtf8("StreamPartition#");
   public static final ByteString DETECT_NEW_PARTITION_SUFFIX =
       ByteString.copyFromUtf8("DetectNewPartition");
 
+  // If change metadata table schema or how the table is used, this version should be bumped up.
+  // Different versions are incompatible and needs to fixed before the pipeline can continue.
+  // Otherwise, the pipeline may fail or even cause corruption in the metadata table.
+  public static final int CURRENT_METADATA_TABLE_VERSION = 1;
+
+  private final BigtableTableAdminClient tableAdminClient;
+  private final BigtableInstanceAdminClient instanceAdminClient;
   private final String tableId;
   private final ByteString changeStreamNamePrefix;
 
-  public MetadataTableAdminDao(String changeStreamName, String tableId) {
+  public MetadataTableAdminDao(
+      BigtableTableAdminClient tableAdminClient,
+      BigtableInstanceAdminClient instanceAdminClient,
+      String changeStreamName,
+      String tableId) {
+    this.tableAdminClient = tableAdminClient;
+    this.instanceAdminClient = instanceAdminClient;
     this.tableId = tableId;
     this.changeStreamNamePrefix = ByteString.copyFromUtf8(changeStreamName + "#");
   }
@@ -79,4 +104,69 @@ public class MetadataTableAdminDao {
   public String getTableId() {
     return tableId;
   }
+
+  /**
+   * Verify the app profile is for single cluster routing with allow single-row transactions
+   * enabled. For metadata data operations, the app profile needs to be single cluster routing
+   * because it requires read-after-write consistency. Also, the operations depend on single row
+   * transactions operations like CheckAndMutateRow.
+   *
+   * @return true if the app profile is single-cluster and allows single-row transactions, otherwise
+   *     false
+   */
+  public boolean isAppProfileSingleClusterAndTransactional(String appProfileId) {
+    AppProfile appProfile =
+        instanceAdminClient.getAppProfile(tableAdminClient.getInstanceId(), appProfileId);
+    if (appProfile.getPolicy() instanceof SingleClusterRoutingPolicy) {
+      SingleClusterRoutingPolicy routingPolicy =
+          (SingleClusterRoutingPolicy) appProfile.getPolicy();
+      return routingPolicy.getAllowTransactionalWrites();
+    }
+    return false;
+  }
+
+  /**
+   * Create the metadata table if it does not exist yet. If the table does exist, verify all the
+   * column families exists, if not add those column families. This table only need to be created
+   * once per instance. All change streams jobs will use this table. This table is created in the
+   * same instance as the table being streamed. While we don't restrict access to the table,
+   * manually editing the table can lead to inconsistent beam jobs.
+   *
+   * @return true if the table was successfully created, otherwise false.
+   */
+  public boolean createMetadataTable() {
+    GCRules.GCRule gcRules = GCRules.GCRULES.maxVersions(1);
+
+    if (tableAdminClient.exists(tableId)) {
+      Table table = tableAdminClient.getTable(tableId);
+      List<ColumnFamily> currentCFs = table.getColumnFamilies();
+      ModifyColumnFamiliesRequest request = ModifyColumnFamiliesRequest.of(tableId);
+      boolean needsNewColumnFamily = false;
+      for (String targetCF : COLUMN_FAMILIES) {
+        boolean exists = false;
+        for (ColumnFamily currentCF : currentCFs) {
+          if (targetCF.equals(currentCF.getId())) {
+            exists = true;
+            break;
+          }
+        }
+        if (!exists) {
+          needsNewColumnFamily = true;
+          request.addFamily(targetCF, gcRules);
+        }
+      }
+      if (needsNewColumnFamily) {
+        tableAdminClient.modifyFamilies(request);
+      }
+      return false;
+    }
+
+    CreateTableRequest createTableRequest = CreateTableRequest.of(tableId);
+    for (String cf : COLUMN_FAMILIES) {
+      createTableRequest.addFamily(cf, gcRules);
+    }
+
+    tableAdminClient.createTable(createTableRequest);
+    return true;
+  }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java
index b9f65f7a4a2..c4889018c89 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java
@@ -21,6 +21,8 @@ import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTabl
 import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao.NEW_PARTITION_PREFIX;
 import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao.STREAM_PARTITION_PREFIX;
 
+import com.google.cloud.bigtable.data.v2.BigtableDataClient;
+import com.google.cloud.bigtable.data.v2.models.RowMutation;
 import com.google.protobuf.ByteString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,10 +37,13 @@ import org.slf4j.LoggerFactory;
 public class MetadataTableDao {
   private static final Logger LOG = LoggerFactory.getLogger(MetadataTableDao.class);
 
+  private final BigtableDataClient dataClient;
   private final String tableId;
   private final ByteString changeStreamNamePrefix;
 
-  public MetadataTableDao(String tableId, ByteString changeStreamNamePrefix) {
+  public MetadataTableDao(
+      BigtableDataClient dataClient, String tableId, ByteString changeStreamNamePrefix) {
+    this.dataClient = dataClient;
     this.tableId = tableId;
     this.changeStreamNamePrefix = changeStreamNamePrefix;
   }
@@ -74,4 +79,18 @@ public class MetadataTableDao {
   private ByteString getFullDetectNewPartition() {
     return changeStreamNamePrefix.concat(DETECT_NEW_PARTITION_SUFFIX);
   }
+
+  /**
+   * Set the version number for DetectNewPartition. This value can be checked later to verify that
+   * the existing metadata table is compatible with current beam connector code.
+   */
+  public void writeDetectNewPartitionVersion() {
+    RowMutation rowMutation =
+        RowMutation.create(tableId, getFullDetectNewPartition())
+            .setCell(
+                MetadataTableAdminDao.CF_VERSION,
+                MetadataTableAdminDao.QUALIFIER_DEFAULT,
+                MetadataTableAdminDao.CURRENT_METADATA_TABLE_VERSION);
+    dataClient.mutateRow(rowMutation);
+  }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFn.java
index c80fb4a4e10..ba56726768e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFn.java
@@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory;
  * A DoFn responsible to initialize the metadata table and prepare it for managing the state of the
  * pipeline.
  */
-@SuppressWarnings("UnusedVariable")
 public class InitializeDoFn extends DoFn<byte[], com.google.cloud.Timestamp>
     implements Serializable {
   private static final long serialVersionUID = 1868189906451252363L;
@@ -53,6 +52,25 @@ public class InitializeDoFn extends DoFn<byte[], com.google.cloud.Timestamp>
     LOG.info(daoFactory.getStreamTableDebugString());
     LOG.info(daoFactory.getMetadataTableDebugString());
     LOG.info("ChangeStreamName: " + daoFactory.getChangeStreamName());
+    if (!daoFactory
+        .getMetadataTableAdminDao()
+        .isAppProfileSingleClusterAndTransactional(this.metadataTableAppProfileId)) {
+      LOG.error(
+          "App profile id '"
+              + metadataTableAppProfileId
+              + "' provided to access metadata table needs to use single-cluster routing policy"
+              + " and allow single-row transactions.");
+      // Terminate this pipeline now.
+      return;
+    }
+    if (daoFactory.getMetadataTableAdminDao().createMetadataTable()) {
+      LOG.info("Created metadata table: " + daoFactory.getMetadataTableAdminDao().getTableId());
+    } else {
+      LOG.info(
+          "Reusing existing metadata table: " + daoFactory.getMetadataTableAdminDao().getTableId());
+    }
+
+    daoFactory.getMetadataTableDao().writeDetectNewPartitionVersion();
 
     receiver.output(startTime);
   }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
index 62ffd39cc1e..a7871dd5414 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
@@ -17,10 +17,10 @@
  */
 package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn;
 
+import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
-import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMutation;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ChangeStreamAction;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/encoder/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/encoder/package-info.java
deleted file mode 100644
index d6388c1a56d..00000000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/encoder/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Encoders for writing and reading from Metadata Table for Google Cloud Bigtable Change Streams.
- */
-@Experimental
-package org.apache.beam.sdk.io.gcp.bigtable.changestreams.encoder;
-
-import org.apache.beam.sdk.annotations.Experimental;
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
index a0ade3fc229..df8dc0fd1cf 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
@@ -67,10 +67,16 @@ public class GcpApiSurfaceTest {
             classesInPackage("com.google.api.gax.retrying"),
             classesInPackage("com.google.api.gax.longrunning"),
             classesInPackage("com.google.api.gax.rpc"),
+            classesInPackage("com.google.api.gax.grpc"),
+            classesInPackage("com.google.api.gax.tracing"),
+            classesInPackage("com.google.api.gax.core"),
+            classesInPackage("com.google.api.gax.batching"),
+            classesInPackage("com.google.api.gax.paging"),
             classesInPackage("com.google.api.services.bigquery.model"),
             classesInPackage("com.google.api.services.healthcare"),
             classesInPackage("com.google.auth"),
             classesInPackage("com.google.bigtable.v2"),
+            classesInPackage("com.google.bigtable.admin.v2"),
             classesInPackage("com.google.cloud"),
             classesInPackage("com.google.common.collect"),
             classesInPackage("com.google.cloud.bigquery.storage.v1"),
@@ -80,8 +86,8 @@ public class GcpApiSurfaceTest {
             classesInPackage("com.google.pubsub.v1"),
             classesInPackage("com.google.cloud.pubsublite"),
             Matchers.equalTo(com.google.api.gax.rpc.ApiException.class),
-            Matchers.equalTo(com.google.api.gax.paging.Page.class),
             Matchers.<Class<?>>equalTo(com.google.api.gax.rpc.StatusCode.class),
+            Matchers.<Class<?>>equalTo(com.google.api.resourcenames.ResourceName.class),
             Matchers.<Class<?>>equalTo(com.google.common.base.Function.class),
             Matchers.<Class<?>>equalTo(com.google.common.base.Optional.class),
             Matchers.<Class<?>>equalTo(com.google.common.base.Supplier.class),
@@ -124,9 +130,7 @@ public class GcpApiSurfaceTest {
             classesInPackage("org.apache.commons.logging"),
             classesInPackage("org.codehaus.jackson"),
             classesInPackage("org.joda.time"),
-            Matchers.<Class<?>>equalTo(org.threeten.bp.Duration.class),
-            Matchers.<Class<?>>equalTo(org.threeten.bp.format.ResolverStyle.class),
-            classesInPackage("org.threeten.bp.temporal"),
+            classesInPackage("org.threeten.bp"),
             classesInPackage("com.google.gson"));
 
     assertThat(apiSurface, containsOnlyClassesMatching(allowedClasses));
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFnTest.java
new file mode 100644
index 00000000000..73587839f17
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFnTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
+import com.google.cloud.bigtable.data.v2.BigtableDataClient;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.models.Row;
+import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule;
+import java.io.IOException;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class InitializeDoFnTest {
+  @ClassRule
+  public static final BigtableEmulatorRule BIGTABLE_EMULATOR_RULE = BigtableEmulatorRule.create();
+
+  @Mock private DaoFactory daoFactory;
+  @Mock private transient MetadataTableAdminDao metadataTableAdminDao;
+  private transient MetadataTableDao metadataTableDao;
+  @Mock private DoFn.OutputReceiver<Timestamp> outputReceiver;
+  private final String tableId = "table";
+
+  private static BigtableDataClient dataClient;
+  private static BigtableTableAdminClient adminClient;
+
+  @BeforeClass
+  public static void beforeClass() throws IOException {
+    BigtableTableAdminSettings adminSettings =
+        BigtableTableAdminSettings.newBuilderForEmulator(BIGTABLE_EMULATOR_RULE.getPort())
+            .setProjectId("fake-project")
+            .setInstanceId("fake-instance")
+            .build();
+    adminClient = BigtableTableAdminClient.create(adminSettings);
+    BigtableDataSettings dataSettingsBuilder =
+        BigtableDataSettings.newBuilderForEmulator(BIGTABLE_EMULATOR_RULE.getPort())
+            .setProjectId("fake-project")
+            .setInstanceId("fake-instance")
+            .build();
+    dataClient = BigtableDataClient.create(dataSettingsBuilder);
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    String changeStreamName = "changeStreamName";
+    metadataTableAdminDao =
+        spy(new MetadataTableAdminDao(adminClient, null, changeStreamName, tableId));
+    doReturn(true).when(metadataTableAdminDao).isAppProfileSingleClusterAndTransactional(any());
+    when(daoFactory.getMetadataTableAdminDao()).thenReturn(metadataTableAdminDao);
+    metadataTableDao =
+        new MetadataTableDao(
+            dataClient, tableId, metadataTableAdminDao.getChangeStreamNamePrefix());
+    when(daoFactory.getMetadataTableDao()).thenReturn(metadataTableDao);
+    when(daoFactory.getChangeStreamName()).thenReturn(changeStreamName);
+  }
+
+  @Test
+  public void testInitializeDefault() throws IOException {
+    Timestamp startTime = Timestamp.now();
+    InitializeDoFn initializeDoFn = new InitializeDoFn(daoFactory, "app-profile", startTime);
+    initializeDoFn.processElement(outputReceiver);
+    verify(outputReceiver, times(1)).output(startTime);
+    assertTrue(adminClient.exists(tableId));
+    Row row =
+        dataClient.readRow(
+            tableId,
+            metadataTableAdminDao
+                .getChangeStreamNamePrefix()
+                .concat(MetadataTableAdminDao.DETECT_NEW_PARTITION_SUFFIX));
+    assertNotNull(row);
+    assertEquals(
+        1,
+        row.getCells(MetadataTableAdminDao.CF_VERSION, MetadataTableAdminDao.QUALIFIER_DEFAULT)
+            .size());
+    assertEquals(
+        MetadataTableAdminDao.CURRENT_METADATA_TABLE_VERSION,
+        (int)
+            Longs.fromByteArray(
+                row.getCells(
+                        MetadataTableAdminDao.CF_VERSION, MetadataTableAdminDao.QUALIFIER_DEFAULT)
+                    .get(0)
+                    .getValue()
+                    .toByteArray()));
+  }
+}