You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2020/12/16 21:32:21 UTC

[beam] branch master updated: [BEAM-11407] Add IT test to Bigtable for BeamSQL

This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cacd4a  [BEAM-11407] Add IT test to Bigtable for BeamSQL
     new 1a5505b  Merge pull request #13512 from piotr-szuberski/bigtable-it
3cacd4a is described below

commit 3cacd4a1acaa5e37b326970aa47a89ee025ad1b4
Author: Piotr Szuberski <pi...@polidea.com>
AuthorDate: Mon Dec 14 09:21:51 2020 +0100

    [BEAM-11407] Add IT test to Bigtable for BeamSQL
---
 .../provider/bigtable/BigtableClientWrapper.java   | 115 ++++++++++
 .../BigtableTableCreationFailuresTest.java         |   2 +-
 .../provider/bigtable/BigtableTableFlatTest.java   | 110 +++++-----
 .../meta/provider/bigtable/BigtableTableIT.java    | 200 +++++++++++++++++
 .../meta/provider/bigtable/BigtableTableTest.java  |  94 --------
 .../provider/bigtable/BigtableTableTestUtils.java  | 237 +++++++++++++++++++++
 .../bigtable/BigtableTableWithRowsTest.java        | 120 +++++------
 .../io/gcp/testing/BigtableEmulatorWrapper.java    |  75 -------
 .../beam/sdk/io/gcp/testing/BigtableTestUtils.java | 153 -------------
 .../beam/sdk/io/gcp/testing/BigtableUtils.java     |  49 +++++
 .../bigtable/BeamRowToBigtableMutationTest.java    |  14 +-
 .../gcp/bigtable/BigtableRowToBeamRowFlatTest.java |  14 +-
 .../io/gcp/bigtable/BigtableRowToBeamRowTest.java  |  17 +-
 .../{TestUtils.java => BigtableTestUtils.java}     |  50 ++++-
 .../sdk/io/gcp/bigtable/CellValueParserTest.java   |  13 +-
 15 files changed, 786 insertions(+), 477 deletions(-)

diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableClientWrapper.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableClientWrapper.java
new file mode 100644
index 0000000..6a8b343
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableClientWrapper.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.bigtable;
+
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.byteString;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.byteStringUtf8;
+
+import com.google.auth.Credentials;
+import com.google.bigtable.admin.v2.ColumnFamily;
+import com.google.bigtable.admin.v2.DeleteTableRequest;
+import com.google.bigtable.admin.v2.Table;
+import com.google.bigtable.v2.MutateRowRequest;
+import com.google.bigtable.v2.Mutation;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.CredentialOptions;
+import com.google.cloud.bigtable.grpc.BigtableDataClient;
+import com.google.cloud.bigtable.grpc.BigtableSession;
+import com.google.cloud.bigtable.grpc.BigtableTableAdminClient;
+import java.io.IOException;
+import java.io.Serializable;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+class BigtableClientWrapper implements Serializable {
+  private final BigtableTableAdminClient tableAdminClient;
+  private final BigtableDataClient dataClient;
+  private final BigtableSession session;
+  private final BigtableOptions bigtableOptions;
+
+  BigtableClientWrapper(
+      String project,
+      String instanceId,
+      @Nullable Integer emulatorPort,
+      @Nullable Credentials gcpCredentials)
+      throws IOException {
+    BigtableOptions.Builder optionsBuilder =
+        BigtableOptions.builder()
+            .setProjectId(project)
+            .setInstanceId(instanceId)
+            .setUserAgent("apache-beam-test");
+    if (emulatorPort != null) {
+      optionsBuilder.enableEmulator("localhost", emulatorPort);
+    }
+    if (gcpCredentials != null) {
+      optionsBuilder.setCredentialOptions(CredentialOptions.credential(gcpCredentials));
+    }
+    bigtableOptions = optionsBuilder.build();
+
+    session = new BigtableSession(bigtableOptions);
+    tableAdminClient = session.getTableAdminClient();
+    dataClient = session.getDataClient();
+  }
+
+  void writeRow(
+      String key,
+      String table,
+      String familyColumn,
+      String columnQualifier,
+      byte[] value,
+      long timestampMicros) {
+    Mutation.SetCell setCell =
+        Mutation.SetCell.newBuilder()
+            .setFamilyName(familyColumn)
+            .setColumnQualifier(byteStringUtf8(columnQualifier))
+            .setValue(byteString(value))
+            .setTimestampMicros(timestampMicros)
+            .build();
+    Mutation mutation = Mutation.newBuilder().setSetCell(setCell).build();
+    MutateRowRequest mutateRowRequest =
+        MutateRowRequest.newBuilder()
+            .setRowKey(byteStringUtf8(key))
+            .setTableName(bigtableOptions.getInstanceName().toTableNameStr(table))
+            .addMutations(mutation)
+            .build();
+    dataClient.mutateRow(mutateRowRequest);
+  }
+
+  void createTable(String tableName, String familyName) {
+    Table.Builder tableBuilder = Table.newBuilder();
+    tableBuilder.putColumnFamilies(familyName, ColumnFamily.newBuilder().build());
+
+    String instanceName = bigtableOptions.getInstanceName().toString();
+    com.google.bigtable.admin.v2.CreateTableRequest.Builder createTableRequestBuilder =
+        com.google.bigtable.admin.v2.CreateTableRequest.newBuilder()
+            .setParent(instanceName)
+            .setTableId(tableName)
+            .setTable(tableBuilder.build());
+    tableAdminClient.createTable(createTableRequestBuilder.build());
+  }
+
+  void deleteTable(String tableId) {
+    final String tableName = bigtableOptions.getInstanceName().toTableNameStr(tableId);
+    DeleteTableRequest.Builder deleteTableRequestBuilder =
+        DeleteTableRequest.newBuilder().setName(tableName);
+    tableAdminClient.deleteTable(deleteTableRequestBuilder.build());
+  }
+
+  void closeSession() throws IOException {
+    session.close();
+  }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableCreationFailuresTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableCreationFailuresTest.java
index 3f6f0a2..04cb0e1 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableCreationFailuresTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableCreationFailuresTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.extensions.sql.meta.provider.bigtable;
 
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.checkMessage;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.checkMessage;
 import static org.junit.Assert.assertThrows;
 
 import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java
index b336e43..9ea0ac8 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java
@@ -17,13 +17,18 @@
  */
 package org.apache.beam.sdk.extensions.sql.meta.provider.bigtable;
 
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.FAMILY_TEST;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.KEY1;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.KEY2;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.TEST_FLAT_SCHEMA;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.bigTableRow;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.columnsMappingString;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.createFlatTableString;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.createReadTable;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.flatRow;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.setFixedTimestamp;
 import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.COLUMNS_MAPPING;
 import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.KEY;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.KEY1;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.KEY2;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.TEST_FLAT_SCHEMA;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.bigTableRow;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.setFixedTimestamp;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
@@ -31,6 +36,8 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import com.alibaba.fastjson.JSONObject;
+import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule;
+import java.io.IOException;
 import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
@@ -44,45 +51,49 @@ import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
-public class BigtableTableFlatTest extends BigtableTableTest {
+public class BigtableTableFlatTest {
 
+  @ClassRule
+  public static final BigtableEmulatorRule BIGTABLE_EMULATOR = BigtableEmulatorRule.create();
+
+  @Rule public TestPipeline readPipeline = TestPipeline.create();
   @Rule public TestPipeline writePipeline = TestPipeline.create();
 
-  private String createFlatTableString(String table) {
-    return "CREATE EXTERNAL TABLE "
-        + table
-        + "( \n"
-        + "  key VARCHAR NOT NULL, \n"
-        + "  boolColumn BOOLEAN NOT NULL, \n"
-        + "  longColumn BIGINT NOT NULL, \n"
-        + "  stringColumn VARCHAR NOT NULL, \n"
-        + "  doubleColumn DOUBLE NOT NULL \n"
-        + ") \n"
-        + "TYPE bigtable \n"
-        + "LOCATION '"
-        + getLocation(table)
-        + "' \n"
-        + "TBLPROPERTIES '{ \n"
-        + "  \"columnsMapping\": \""
-        + columnsMappingString()
-        + "\"}'";
+  private static BigtableClientWrapper emulatorWrapper;
+
+  private static final String PROJECT = "fakeProject";
+  private static final String INSTANCE = "fakeInstance";
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    emulatorWrapper =
+        new BigtableClientWrapper(PROJECT, INSTANCE, BIGTABLE_EMULATOR.getPort(), null);
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    emulatorWrapper.closeSession();
   }
 
   @Test
   public void testCreatesFlatSchemaCorrectly() {
+    final String tableId = "flatTableSchema";
     InMemoryMetaStore metaStore = new InMemoryMetaStore();
     metaStore.registerProvider(new BigtableTableProvider());
 
     BeamSqlCli cli = new BeamSqlCli().metaStore(metaStore);
-    cli.execute(createFlatTableString("flatTable"));
+    cli.execute(createFlatTableString(tableId, location(tableId)));
 
-    Table table = metaStore.getTables().get("flatTable");
+    Table table = metaStore.getTables().get(tableId);
     assertNotNull(table);
     assertEquals(TEST_FLAT_SCHEMA, table.getSchema());
 
@@ -92,19 +103,13 @@ public class BigtableTableFlatTest extends BigtableTableTest {
   }
 
   @Test
-  public void testSimpleSelectFlat() throws Exception {
-    createReadTable("flatTable");
+  public void testSimpleSelectFlat() {
+    final String tableId = "flatTable";
+    createReadTable(tableId, emulatorWrapper);
     BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new BigtableTableProvider());
-    sqlEnv.executeDdl(createFlatTableString("flatTable"));
+    sqlEnv.executeDdl(createFlatTableString(tableId, location(tableId)));
 
-    String query =
-        "SELECT \n"
-            + "  key, \n"
-            + "  boolColumn, \n"
-            + "  longColumn, \n"
-            + "  stringColumn, \n"
-            + "  doubleColumn \n"
-            + "FROM flatTable";
+    String query = "SELECT key, boolColumn, longColumn, stringColumn, doubleColumn FROM flatTable";
 
     sqlEnv.parseQuery(query);
     PCollection<Row> queryOutput =
@@ -112,15 +117,16 @@ public class BigtableTableFlatTest extends BigtableTableTest {
 
     assertThat(queryOutput.getSchema(), equalTo(TEST_FLAT_SCHEMA));
 
-    PAssert.that(queryOutput).containsInAnyOrder(row(KEY1), row(KEY2));
+    PAssert.that(queryOutput).containsInAnyOrder(flatRow(KEY1), flatRow(KEY2));
     readPipeline.run().waitUntilFinish();
   }
 
   @Test
-  public void testSelectFlatKeyRegexQuery() throws Exception {
-    createReadTable("regexTable");
+  public void testSelectFlatKeyRegexQuery() {
+    final String tableId = "regexTable";
+    createReadTable(tableId, emulatorWrapper);
     BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new BigtableTableProvider());
-    sqlEnv.executeDdl(createFlatTableString("regexTable"));
+    sqlEnv.executeDdl(createFlatTableString(tableId, location(tableId)));
 
     String query = "SELECT key FROM regexTable WHERE key LIKE '^key[0134]{1}'";
 
@@ -136,9 +142,10 @@ public class BigtableTableFlatTest extends BigtableTableTest {
 
   @Test
   public void testSimpleInsert() {
-    createTable("beamWriteTable");
+    final String tableId = "beamWriteTable";
+    emulatorWrapper.createTable(tableId, FAMILY_TEST);
     BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new BigtableTableProvider());
-    sqlEnv.executeDdl(createFlatTableString("beamWriteTable"));
+    sqlEnv.executeDdl(createFlatTableString(tableId, location(tableId)));
 
     String query =
         "INSERT INTO beamWriteTable(key, boolColumn, longColumn, stringColumn, doubleColumn) "
@@ -149,13 +156,17 @@ public class BigtableTableFlatTest extends BigtableTableTest {
 
     PCollection<com.google.bigtable.v2.Row> bigTableRows =
         readPipeline
-            .apply(readTransform("beamWriteTable"))
+            .apply(readTransform(tableId))
             .apply(MapElements.via(new ReplaceCellTimestamp()));
 
     PAssert.that(bigTableRows).containsInAnyOrder(bigTableRow());
     readPipeline.run().waitUntilFinish();
   }
 
+  private String location(String tableId) {
+    return BigtableTableTestUtils.location(PROJECT, INSTANCE, tableId, BIGTABLE_EMULATOR.getPort());
+  }
+
   private Schema filterSchema() {
     return Schema.builder().addStringField(KEY).build();
   }
@@ -172,16 +183,7 @@ public class BigtableTableFlatTest extends BigtableTableTest {
     }
   }
 
-  private String columnsMappingString() {
-    return "familyTest:boolColumn,familyTest:longColumn,familyTest:doubleColumn,"
-        + "familyTest:stringColumn";
-  }
-
-  private static Row row(String key) {
-    return Row.withSchema(TEST_FLAT_SCHEMA).attachValues(key, false, 2L, "string2", 2.20);
-  }
-
-  private static BigtableIO.Read readTransform(String table) {
+  private BigtableIO.Read readTransform(String table) {
     return BigtableIO.read()
         .withProjectId("fakeProject")
         .withInstanceId("fakeInstance")
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableIT.java
new file mode 100644
index 0000000..5b86e15
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableIT.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.bigtable;
+
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.FAMILY_TEST;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.NOW;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.TEST_FLAT_SCHEMA;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.createFlatTableString;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.createFullTableString;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.expectedFullSchema;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.TIMESTAMP_MICROS;
+
+import com.google.auth.Credentials;
+import com.google.cloud.bigtable.emulator.v2.Emulator;
+import java.util.UUID;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class BigtableTableIT {
+  private static BigtableTestOptions options;
+  private static BigtableClientWrapper clientWrapper;
+  private static final String TABLE_ID = "Beam" + UUID.randomUUID();
+  private static Emulator emulator;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    PipelineOptionsFactory.register(BigtableTestOptions.class);
+    options = TestPipeline.testingPipelineOptions().as(BigtableTestOptions.class);
+
+    if (options.isWithEmulator()) {
+      emulator = Emulator.createBundled();
+      emulator.start();
+    }
+    Credentials credentials =
+        options.isWithEmulator() ? null : options.as(GcpOptions.class).getGcpCredential();
+    Integer emulatorPort = options.isWithEmulator() ? emulator.getPort() : null;
+
+    clientWrapper =
+        new BigtableClientWrapper(
+            options.getBigtableProject(), options.getInstanceId(), emulatorPort, credentials);
+
+    clientWrapper.createTable(TABLE_ID, FAMILY_TEST);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    clientWrapper.deleteTable(TABLE_ID);
+    clientWrapper.closeSession();
+    if (emulator != null) {
+      emulator.stop();
+    }
+  }
+
+  @Test
+  public void testWriteThenRead() {
+    writeData();
+    readFlatData();
+    readData();
+  }
+
+  private void writeData() {
+    Pipeline p = Pipeline.create(options);
+    BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new BigtableTableProvider());
+    sqlEnv.executeDdl(createFlatTableString(TABLE_ID, location()));
+
+    String query =
+        String.format(
+            "INSERT INTO `%s`(key, boolColumn, longColumn, stringColumn, doubleColumn) "
+                + "VALUES ('key1', FALSE, 1, 'string1', 1.0)",
+            TABLE_ID);
+
+    BeamSqlRelUtils.toPCollection(p, sqlEnv.parseQuery(query));
+    p.run().waitUntilFinish();
+  }
+
+  private void readFlatData() {
+    Pipeline p = Pipeline.create(options);
+    BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new BigtableTableProvider());
+    sqlEnv.executeDdl(createFlatTableString(TABLE_ID, location()));
+    String query = "SELECT * FROM `" + TABLE_ID + "`";
+
+    PCollection<Row> flatRows = BeamSqlRelUtils.toPCollection(p, sqlEnv.parseQuery(query));
+
+    PAssert.that(flatRows).containsInAnyOrder(expectedFlatRow(1));
+    p.run().waitUntilFinish();
+  }
+
+  private void readData() {
+    Pipeline p = Pipeline.create(options);
+    BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new BigtableTableProvider());
+    sqlEnv.executeDdl(createFullTableString(TABLE_ID, location()));
+    String query =
+        String.format(
+            "SELECT key, "
+                + "  t.familyTest.boolColumn, "
+                + "  t.familyTest.longColumn.val AS longValue, "
+                + "  t.familyTest.longColumn.timestampMicros, "
+                + "  t.familyTest.longColumn.labels, "
+                + "  t.familyTest.stringColumn, "
+                + "  t.familyTest.doubleColumn "
+                + "FROM `%s` t",
+            TABLE_ID);
+
+    PCollection<Row> rows =
+        BeamSqlRelUtils.toPCollection(p, sqlEnv.parseQuery(query))
+            .apply(MapElements.via(new ReplaceTimestamp()))
+            .setRowSchema(expectedFullSchema());
+
+    PAssert.that(rows).containsInAnyOrder(expectedFullRow(1));
+    p.run().waitUntilFinish();
+  }
+
+  private Row expectedFullRow(int i) {
+    return Row.withSchema(expectedFullSchema())
+        .attachValues(
+            "key" + i,
+            i % 2 == 0,
+            (long) i,
+            NOW,
+            ImmutableList.of(),
+            ImmutableList.of("string" + i),
+            (double) i);
+  }
+
+  private Row expectedFlatRow(int i) {
+    return Row.withSchema(TEST_FLAT_SCHEMA)
+        .attachValues("key" + i, i % 2 == 0, (long) i, "string" + i, (double) i);
+  }
+
+  private static class ReplaceTimestamp extends SimpleFunction<Row, Row> {
+    @Override
+    public Row apply(Row input) {
+      return Row.fromRow(input).withFieldValue(TIMESTAMP_MICROS, NOW).build();
+    }
+  }
+
+  private String location() {
+    Integer emulatorPort = options.isWithEmulator() ? emulator.getPort() : null;
+    return BigtableTableTestUtils.location(
+        options.getBigtableProject(), options.getInstanceId(), TABLE_ID, emulatorPort);
+  }
+
+  /** Properties needed when using Bigtable with the Beam SDK. */
+  public interface BigtableTestOptions extends TestPipelineOptions {
+    @Description("Instance ID for Bigtable")
+    @Default.String("fakeInstance")
+    String getInstanceId();
+
+    void setInstanceId(String value);
+
+    @Description("Project for Bigtable")
+    @Default.String("fakeProject")
+    String getBigtableProject();
+
+    void setBigtableProject(String value);
+
+    @Description("Whether to use emulator")
+    @Default.Boolean(true)
+    Boolean isWithEmulator();
+
+    void setWithEmulator(Boolean value);
+  }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTest.java
deleted file mode 100644
index 94d1df2..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.meta.provider.bigtable;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.BINARY_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.BOOL_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.DOUBLE_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.FAMILY_TEST;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.KEY1;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.KEY2;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.LATER;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.LONG_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.NOW;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.STRING_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.booleanToByteArray;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.doubleToByteArray;
-
-import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule;
-import org.apache.beam.sdk.io.gcp.testing.BigtableEmulatorWrapper;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-public abstract class BigtableTableTest {
-
-  @ClassRule
-  public static final BigtableEmulatorRule BIGTABLE_EMULATOR = BigtableEmulatorRule.create();
-
-  @Rule public transient TestPipeline readPipeline = TestPipeline.create();
-
-  private static BigtableEmulatorWrapper emulatorWrapper;
-
-  @BeforeClass
-  public static void setUp() throws Exception {
-    emulatorWrapper =
-        new BigtableEmulatorWrapper(BIGTABLE_EMULATOR.getPort(), "fakeProject", "fakeInstance");
-  }
-
-  protected static void createTable(String table) {
-    emulatorWrapper.createTable(table, FAMILY_TEST);
-  }
-
-  protected static void createReadTable(String table) throws Exception {
-    createTable(table);
-    writeRow(KEY1, table);
-    writeRow(KEY2, table);
-  }
-
-  protected static String getLocation(String table) {
-    return String.format(
-        "localhost:%s/bigtable/projects/fakeProject/instances/fakeInstance/tables/%s",
-        BIGTABLE_EMULATOR.getPort(), table);
-  }
-
-  private static void writeRow(String key, String table) throws Exception {
-    emulatorWrapper.writeRow(key, table, FAMILY_TEST, BOOL_COLUMN, booleanToByteArray(true), NOW);
-    emulatorWrapper.writeRow(
-        key, table, FAMILY_TEST, BOOL_COLUMN, booleanToByteArray(false), LATER);
-    emulatorWrapper.writeRow(
-        key, table, FAMILY_TEST, STRING_COLUMN, "string1".getBytes(UTF_8), NOW);
-    emulatorWrapper.writeRow(
-        key, table, FAMILY_TEST, STRING_COLUMN, "string2".getBytes(UTF_8), LATER);
-    emulatorWrapper.writeRow(key, table, FAMILY_TEST, LONG_COLUMN, Longs.toByteArray(1L), NOW);
-    emulatorWrapper.writeRow(key, table, FAMILY_TEST, LONG_COLUMN, Longs.toByteArray(2L), LATER);
-    emulatorWrapper.writeRow(key, table, FAMILY_TEST, DOUBLE_COLUMN, doubleToByteArray(1.10), NOW);
-    emulatorWrapper.writeRow(
-        key, table, FAMILY_TEST, DOUBLE_COLUMN, doubleToByteArray(2.20), LATER);
-    emulatorWrapper.writeRow(
-        key, table, FAMILY_TEST, BINARY_COLUMN, "blob1".getBytes(UTF_8), LATER);
-    emulatorWrapper.writeRow(
-        key, table, FAMILY_TEST, BINARY_COLUMN, "blob2".getBytes(UTF_8), LATER);
-  }
-}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTestUtils.java
new file mode 100644
index 0000000..4c466fa
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTestUtils.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.bigtable;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.toList;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.KEY;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.LABELS;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.TIMESTAMP_MICROS;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.VALUE;
+import static org.apache.beam.sdk.io.gcp.testing.BigtableUtils.booleanToByteArray;
+import static org.apache.beam.sdk.io.gcp.testing.BigtableUtils.byteString;
+import static org.apache.beam.sdk.io.gcp.testing.BigtableUtils.byteStringUtf8;
+import static org.apache.beam.sdk.io.gcp.testing.BigtableUtils.doubleToByteArray;
+import static org.apache.beam.sdk.io.gcp.testing.BigtableUtils.longToByteArray;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.fail;
+
+import com.google.bigtable.v2.Cell;
+import com.google.bigtable.v2.Column;
+import com.google.bigtable.v2.Family;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+class BigtableTableTestUtils {
+
+  static final String KEY1 = "key1";
+  static final String KEY2 = "key2";
+
+  static final String BOOL_COLUMN = "boolColumn";
+  static final String LONG_COLUMN = "longColumn";
+  static final String STRING_COLUMN = "stringColumn";
+  static final String DOUBLE_COLUMN = "doubleColumn";
+  static final String FAMILY_TEST = "familyTest";
+
+  static final Schema LONG_COLUMN_SCHEMA =
+      Schema.builder()
+          .addInt64Field(VALUE)
+          .addInt64Field(TIMESTAMP_MICROS)
+          .addArrayField(LABELS, Schema.FieldType.STRING)
+          .build();
+
+  static final Schema TEST_FAMILY_SCHEMA =
+      Schema.builder()
+          .addBooleanField(BOOL_COLUMN)
+          .addRowField(LONG_COLUMN, LONG_COLUMN_SCHEMA)
+          .addArrayField(STRING_COLUMN, Schema.FieldType.STRING)
+          .addDoubleField(DOUBLE_COLUMN)
+          .build();
+
+  static final Schema TEST_SCHEMA =
+      Schema.builder().addStringField(KEY).addRowField(FAMILY_TEST, TEST_FAMILY_SCHEMA).build();
+
+  static final Schema TEST_FLAT_SCHEMA =
+      Schema.builder()
+          .addStringField(KEY)
+          .addBooleanField(BOOL_COLUMN)
+          .addInt64Field(LONG_COLUMN)
+          .addStringField(STRING_COLUMN)
+          .addDoubleField(DOUBLE_COLUMN)
+          .build();
+
+  static final long NOW = 5_000_000_000L;
+  static final long LATER = NOW + 1_000L;
+
+  static String createFlatTableString(String table, String location) {
+    return String.format(
+        "CREATE EXTERNAL TABLE `%s`( \n"
+            + "  key VARCHAR NOT NULL, \n"
+            + "  boolColumn BOOLEAN NOT NULL, \n"
+            + "  longColumn BIGINT NOT NULL, \n"
+            + "  stringColumn VARCHAR NOT NULL, \n"
+            + "  doubleColumn DOUBLE NOT NULL \n"
+            + ") \n"
+            + "TYPE bigtable \n"
+            + "LOCATION '%s' \n"
+            + "TBLPROPERTIES '{ \n"
+            + "  \"columnsMapping\": \"%s\"}'",
+        table, location, columnsMappingString());
+  }
+
+  static String createFullTableString(String tableId, String location) {
+    return String.format(
+        "CREATE EXTERNAL TABLE `%s`( \n"
+            + "  key VARCHAR NOT NULL, \n"
+            + "  familyTest ROW< \n"
+            + "    boolColumn BOOLEAN NOT NULL, \n"
+            + "    longColumn ROW< \n"
+            + "      val BIGINT NOT NULL, \n"
+            + "      timestampMicros BIGINT NOT NULL, \n"
+            + "      labels ARRAY<VARCHAR> NOT NULL \n"
+            + "    > NOT NULL, \n"
+            + "    stringColumn ARRAY<VARCHAR> NOT NULL, \n"
+            + "    doubleColumn DOUBLE NOT NULL \n"
+            + "  > NOT NULL \n"
+            + ") \n"
+            + "TYPE bigtable \n"
+            + "LOCATION '%s'",
+        tableId, location);
+  }
+
+  static Schema expectedFullSchema() {
+    return Schema.builder()
+        .addStringField(KEY)
+        .addBooleanField(BOOL_COLUMN)
+        .addInt64Field("longValue")
+        .addInt64Field(TIMESTAMP_MICROS)
+        .addArrayField(LABELS, Schema.FieldType.STRING)
+        .addArrayField(STRING_COLUMN, Schema.FieldType.STRING)
+        .addDoubleField(DOUBLE_COLUMN)
+        .build();
+  }
+
+  static Row expectedFullRow(String key) {
+    return Row.withSchema(expectedFullSchema())
+        .attachValues(
+            key,
+            false,
+            2L,
+            LATER,
+            ImmutableList.of(),
+            ImmutableList.of("string1", "string2"),
+            2.20);
+  }
+
+  static Row flatRow(String key) {
+    return Row.withSchema(TEST_FLAT_SCHEMA).attachValues(key, false, 2L, "string2", 2.20);
+  }
+
+  static String location(
+      String project, String instanceId, String tableId, @Nullable Integer emulatorPort) {
+    String host = emulatorPort == null ? "googleapis.com" : "localhost:" + emulatorPort;
+    return String.format(
+        "%s/bigtable/projects/%s/instances/%s/tables/%s", host, project, instanceId, tableId);
+  }
+
+  static String columnsMappingString() {
+    return "familyTest:boolColumn,familyTest:longColumn,familyTest:doubleColumn,"
+        + "familyTest:stringColumn";
+  }
+
+  static void createReadTable(String table, BigtableClientWrapper clientWrapper) {
+    clientWrapper.createTable(table, FAMILY_TEST);
+    writeRow(KEY1, table, clientWrapper);
+    writeRow(KEY2, table, clientWrapper);
+  }
+
+  static com.google.bigtable.v2.Row bigTableRow() {
+    List<Column> columns =
+        ImmutableList.of(
+            column("boolColumn", booleanToByteArray(true)),
+            column("doubleColumn", doubleToByteArray(5.5)),
+            column("longColumn", Longs.toByteArray(10L)),
+            column("stringColumn", "stringValue".getBytes(UTF_8)));
+    Family family = Family.newBuilder().setName("familyTest").addAllColumns(columns).build();
+    return com.google.bigtable.v2.Row.newBuilder()
+        .setKey(byteStringUtf8("key"))
+        .addFamilies(family)
+        .build();
+  }
+
+  // There is no possibility to insert a value with fixed timestamp so we have to replace it
+  // for the testing purpose.
+  static com.google.bigtable.v2.Row setFixedTimestamp(com.google.bigtable.v2.Row row) {
+    Family family = row.getFamilies(0);
+
+    List<Column> columnsReplaced =
+        family.getColumnsList().stream()
+            .map(
+                column -> {
+                  Cell cell = column.getCells(0);
+                  return column(
+                      column.getQualifier().toStringUtf8(), cell.getValue().toByteArray());
+                })
+            .collect(toList());
+    Family familyReplaced =
+        Family.newBuilder().setName(family.getName()).addAllColumns(columnsReplaced).build();
+    return com.google.bigtable.v2.Row.newBuilder()
+        .setKey(row.getKey())
+        .addFamilies(familyReplaced)
+        .build();
+  }
+
+  static void checkMessage(@Nullable String message, String substring) {
+    if (message != null) {
+      assertThat(message, containsString(substring));
+    } else {
+      fail();
+    }
+  }
+
+  private static Column column(String qualifier, byte[] value) {
+    return Column.newBuilder()
+        .setQualifier(byteStringUtf8(qualifier))
+        .addCells(cell(value))
+        .build();
+  }
+
+  private static Cell cell(byte[] value) {
+    return Cell.newBuilder().setValue(byteString(value)).setTimestampMicros(NOW).build();
+  }
+
+  private static void writeRow(String key, String table, BigtableClientWrapper clientWrapper) {
+    clientWrapper.writeRow(key, table, FAMILY_TEST, BOOL_COLUMN, booleanToByteArray(true), NOW);
+    clientWrapper.writeRow(key, table, FAMILY_TEST, BOOL_COLUMN, booleanToByteArray(false), LATER);
+    clientWrapper.writeRow(key, table, FAMILY_TEST, STRING_COLUMN, "string1".getBytes(UTF_8), NOW);
+    clientWrapper.writeRow(
+        key, table, FAMILY_TEST, STRING_COLUMN, "string2".getBytes(UTF_8), LATER);
+    clientWrapper.writeRow(key, table, FAMILY_TEST, LONG_COLUMN, longToByteArray(1L), NOW);
+    clientWrapper.writeRow(key, table, FAMILY_TEST, LONG_COLUMN, longToByteArray(2L), LATER);
+    clientWrapper.writeRow(key, table, FAMILY_TEST, DOUBLE_COLUMN, doubleToByteArray(1.10), NOW);
+    clientWrapper.writeRow(key, table, FAMILY_TEST, DOUBLE_COLUMN, doubleToByteArray(2.20), LATER);
+  }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableWithRowsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableWithRowsTest.java
index 7b0f2b5..b3d02ef 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableWithRowsTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableWithRowsTest.java
@@ -17,62 +17,65 @@
  */
 package org.apache.beam.sdk.extensions.sql.meta.provider.bigtable;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.stream.Collectors.toList;
-import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.KEY;
-import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.LABELS;
-import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.TIMESTAMP_MICROS;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.BINARY_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.BOOL_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.DOUBLE_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.KEY1;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.KEY2;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.LATER;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.STRING_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.TEST_SCHEMA;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.KEY1;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.KEY2;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.STRING_COLUMN;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.TEST_SCHEMA;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.createFullTableString;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.createReadTable;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.expectedFullRow;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.expectedFullSchema;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
+import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule;
+import java.io.IOException;
 import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
-import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
 
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
-public class BigtableTableWithRowsTest extends BigtableTableTest {
-
-  private String createTableString() {
-    return "CREATE EXTERNAL TABLE beamTable( \n"
-        + "  key VARCHAR NOT NULL, \n"
-        + "  familyTest ROW< \n"
-        + "    boolColumn BOOLEAN NOT NULL, \n"
-        + "    longColumn ROW< \n"
-        + "      val BIGINT NOT NULL, \n"
-        + "      timestampMicros BIGINT NOT NULL, \n"
-        + "      labels ARRAY<VARCHAR> NOT NULL \n"
-        + "    > NOT NULL, \n"
-        + "    stringColumn ARRAY<VARCHAR> NOT NULL, \n"
-        + "    doubleColumn DOUBLE NOT NULL, \n"
-        + "    binaryColumn BINARY NOT NULL \n"
-        + "  > NOT NULL \n"
-        + ") \n"
-        + "TYPE bigtable \n"
-        + "LOCATION '"
-        + getLocation("beamTable")
-        + "'";
+public class BigtableTableWithRowsTest {
+
+  @ClassRule
+  public static final BigtableEmulatorRule BIGTABLE_EMULATOR = BigtableEmulatorRule.create();
+
+  @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+  private static BigtableClientWrapper emulatorWrapper;
+
+  private static final String PROJECT = "fakeProject";
+  private static final String INSTANCE = "fakeInstance";
+  private static final String TABLE = "beamTable";
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    emulatorWrapper =
+        new BigtableClientWrapper("fakeProject", "fakeInstance", BIGTABLE_EMULATOR.getPort(), null);
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    emulatorWrapper.closeSession();
   }
 
   @Test
@@ -81,7 +84,7 @@ public class BigtableTableWithRowsTest extends BigtableTableTest {
     metaStore.registerProvider(new BigtableTableProvider());
 
     BeamSqlCli cli = new BeamSqlCli().metaStore(metaStore);
-    cli.execute(createTableString());
+    cli.execute(createFullTableString(TABLE, location()));
 
     Table table = metaStore.getTables().get("beamTable");
     assertNotNull(table);
@@ -89,59 +92,36 @@ public class BigtableTableWithRowsTest extends BigtableTableTest {
   }
 
   @Test
-  public void testSimpleSelect() throws Exception {
-    createReadTable("beamTable");
+  public void testSimpleSelect() {
+    createReadTable(TABLE, emulatorWrapper);
     BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new BigtableTableProvider());
-    sqlEnv.executeDdl(createTableString());
+    sqlEnv.executeDdl(createFullTableString(TABLE, location()));
     String query =
-        ""
-            + "SELECT key, \n"
+        "SELECT key, \n"
             + "  bt.familyTest.boolColumn, \n"
             + "  bt.familyTest.longColumn.val AS longValue, \n"
             + "  bt.familyTest.longColumn.timestampMicros, \n"
             + "  bt.familyTest.longColumn.labels, \n"
             + "  bt.familyTest.stringColumn, \n"
-            + "  bt.familyTest.doubleColumn, \n"
-            + "  bt.familyTest.binaryColumn \n"
+            + "  bt.familyTest.doubleColumn \n"
             + "FROM beamTable bt";
     sqlEnv.parseQuery(query);
     PCollection<Row> queryOutput =
         BeamSqlRelUtils.toPCollection(readPipeline, sqlEnv.parseQuery(query));
 
-    assertThat(queryOutput.getSchema(), equalTo(expectedSchema()));
+    assertThat(queryOutput.getSchema(), equalTo(expectedFullSchema()));
 
     PCollection<Row> sorted =
-        queryOutput.apply(MapElements.via(new SortByTimestamp())).setRowSchema(expectedSchema());
+        queryOutput
+            .apply(MapElements.via(new SortByTimestamp()))
+            .setRowSchema(expectedFullSchema());
 
-    PAssert.that(sorted)
-        .containsInAnyOrder(row(expectedSchema(), KEY1), row(expectedSchema(), KEY2));
+    PAssert.that(sorted).containsInAnyOrder(expectedFullRow(KEY1), expectedFullRow(KEY2));
     readPipeline.run().waitUntilFinish();
   }
 
-  private static Schema expectedSchema() {
-    return Schema.builder()
-        .addStringField(KEY)
-        .addBooleanField(BOOL_COLUMN)
-        .addInt64Field("longValue")
-        .addInt64Field(TIMESTAMP_MICROS)
-        .addArrayField(LABELS, Schema.FieldType.STRING)
-        .addArrayField(STRING_COLUMN, Schema.FieldType.STRING)
-        .addDoubleField(DOUBLE_COLUMN)
-        .addByteArrayField(BINARY_COLUMN)
-        .build();
-  }
-
-  private static Row row(Schema schema, String key) {
-    return Row.withSchema(schema)
-        .attachValues(
-            key,
-            false,
-            2L,
-            LATER,
-            ImmutableList.of(),
-            ImmutableList.of("string1", "string2"),
-            2.20,
-            "blob2".getBytes(UTF_8));
+  private String location() {
+    return BigtableTableTestUtils.location(PROJECT, INSTANCE, TABLE, BIGTABLE_EMULATOR.getPort());
   }
 
   private static class SortByTimestamp extends SimpleFunction<Row, Row> {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigtableEmulatorWrapper.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigtableEmulatorWrapper.java
deleted file mode 100644
index 2a4b87e..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigtableEmulatorWrapper.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.testing;
-
-import com.google.api.core.ApiFuture;
-import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
-import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
-import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
-import com.google.cloud.bigtable.data.v2.BigtableDataClient;
-import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
-import com.google.cloud.bigtable.data.v2.models.RowMutation;
-import com.google.protobuf.ByteString;
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-
-public class BigtableEmulatorWrapper {
-  private final BigtableTableAdminClient tableAdminClient;
-  private final BigtableDataClient dataClient;
-
-  public BigtableEmulatorWrapper(int emulatorPort, String projectId, String instanceId)
-      throws IOException {
-    BigtableTableAdminSettings.Builder tableAdminSettings =
-        BigtableTableAdminSettings.newBuilderForEmulator(emulatorPort)
-            .setProjectId(projectId)
-            .setInstanceId(instanceId);
-    tableAdminClient = BigtableTableAdminClient.create(tableAdminSettings.build());
-
-    BigtableDataSettings.Builder dataSettings =
-        BigtableDataSettings.newBuilderForEmulator(emulatorPort)
-            .setProjectId(projectId)
-            .setInstanceId(instanceId);
-    dataClient = BigtableDataClient.create(dataSettings.build());
-  }
-
-  public void writeRow(
-      String key,
-      String table,
-      String familyColumn,
-      String columnQualifier,
-      byte[] value,
-      long timestampMicros)
-      throws ExecutionException, InterruptedException {
-    ApiFuture<Void> mutateFuture =
-        dataClient.mutateRowAsync(
-            RowMutation.create(table, key)
-                .setCell(
-                    familyColumn,
-                    ByteString.copyFromUtf8(columnQualifier),
-                    timestampMicros,
-                    ByteString.copyFrom(value)));
-    mutateFuture.get();
-  }
-
-  public void createTable(String tableName, String... families) {
-    CreateTableRequest request = CreateTableRequest.of(tableName);
-    ImmutableList.copyOf(families).forEach(request::addFamily);
-    tableAdminClient.createTable(request);
-  }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigtableTestUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigtableTestUtils.java
deleted file mode 100644
index d9796fd..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigtableTestUtils.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.testing;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.stream.Collectors.toList;
-import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.KEY;
-import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.LABELS;
-import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.TIMESTAMP_MICROS;
-import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.VALUE;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsString;
-import static org.junit.Assert.fail;
-
-import com.google.bigtable.v2.Cell;
-import com.google.bigtable.v2.Column;
-import com.google.bigtable.v2.Family;
-import com.google.protobuf.ByteString;
-import java.util.List;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
-import org.checkerframework.checker.nullness.qual.Nullable;
-
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-public class BigtableTestUtils {
-
-  public static final String KEY1 = "key1";
-  public static final String KEY2 = "key2";
-
-  public static final String BOOL_COLUMN = "boolColumn";
-  public static final String LONG_COLUMN = "longColumn";
-  public static final String STRING_COLUMN = "stringColumn";
-  public static final String DOUBLE_COLUMN = "doubleColumn";
-  public static final String BINARY_COLUMN = "binaryColumn";
-  public static final String FAMILY_TEST = "familyTest";
-
-  public static final Schema LONG_COLUMN_SCHEMA =
-      Schema.builder()
-          .addInt64Field(VALUE)
-          .addInt64Field(TIMESTAMP_MICROS)
-          .addArrayField(LABELS, Schema.FieldType.STRING)
-          .build();
-
-  public static final Schema TEST_FAMILY_SCHEMA =
-      Schema.builder()
-          .addBooleanField(BOOL_COLUMN)
-          .addRowField(LONG_COLUMN, LONG_COLUMN_SCHEMA)
-          .addArrayField(STRING_COLUMN, Schema.FieldType.STRING)
-          .addDoubleField(DOUBLE_COLUMN)
-          .addByteArrayField(BINARY_COLUMN)
-          .build();
-
-  public static final Schema TEST_SCHEMA =
-      Schema.builder().addStringField(KEY).addRowField(FAMILY_TEST, TEST_FAMILY_SCHEMA).build();
-
-  public static final Schema TEST_FLAT_SCHEMA =
-      Schema.builder()
-          .addStringField(KEY)
-          .addBooleanField(BOOL_COLUMN)
-          .addInt64Field(LONG_COLUMN)
-          .addStringField(STRING_COLUMN)
-          .addDoubleField(DOUBLE_COLUMN)
-          .build();
-
-  public static final long NOW = 5_000_000_000L;
-  public static final long LATER = NOW + 1_000L;
-
-  public static byte[] floatToByteArray(float number) {
-    return Ints.toByteArray(Float.floatToIntBits(number));
-  }
-
-  public static byte[] doubleToByteArray(double number) {
-    return Longs.toByteArray(Double.doubleToLongBits(number));
-  }
-
-  public static byte[] booleanToByteArray(boolean condition) {
-    return condition ? new byte[] {1} : new byte[] {0};
-  }
-
-  public static void checkMessage(@Nullable String message, String substring) {
-    if (message != null) {
-      assertThat(message, containsString(substring));
-    } else {
-      fail();
-    }
-  }
-
-  public static com.google.bigtable.v2.Row bigTableRow() {
-    List<Column> columns =
-        ImmutableList.of(
-            column("boolColumn", booleanToByteArray(true)),
-            column("doubleColumn", doubleToByteArray(5.5)),
-            column("longColumn", Longs.toByteArray(10L)),
-            column("stringColumn", "stringValue".getBytes(UTF_8)));
-    Family family = Family.newBuilder().setName("familyTest").addAllColumns(columns).build();
-    return com.google.bigtable.v2.Row.newBuilder()
-        .setKey(ByteString.copyFromUtf8("key"))
-        .addFamilies(family)
-        .build();
-  }
-
-  // There is no possibility to insert a value with fixed timestamp so we have to replace it
-  // for the testing purpose.
-  public static com.google.bigtable.v2.Row setFixedTimestamp(com.google.bigtable.v2.Row row) {
-    Family family = row.getFamilies(0);
-
-    List<Column> columnsReplaced =
-        family.getColumnsList().stream()
-            .map(
-                column -> {
-                  Cell cell = column.getCells(0);
-                  return column(
-                      column.getQualifier().toStringUtf8(), cell.getValue().toByteArray());
-                })
-            .collect(toList());
-    Family familyReplaced =
-        Family.newBuilder().setName(family.getName()).addAllColumns(columnsReplaced).build();
-    return com.google.bigtable.v2.Row.newBuilder()
-        .setKey(row.getKey())
-        .addFamilies(familyReplaced)
-        .build();
-  }
-
-  private static Column column(String qualifier, byte[] value) {
-    return Column.newBuilder()
-        .setQualifier(ByteString.copyFromUtf8(qualifier))
-        .addCells(cell(value))
-        .build();
-  }
-
-  private static Cell cell(byte[] value) {
-    return Cell.newBuilder().setValue(ByteString.copyFrom(value)).setTimestampMicros(NOW).build();
-  }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigtableUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigtableUtils.java
new file mode 100644
index 0000000..67ed384
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigtableUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.testing;
+
+import com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
+
+public class BigtableUtils {
+
+  public static ByteString byteString(byte[] bytes) {
+    return ByteString.copyFrom(bytes);
+  }
+
+  public static ByteString byteStringUtf8(String s) {
+    return ByteString.copyFromUtf8(s);
+  }
+
+  public static byte[] floatToByteArray(float number) {
+    return Ints.toByteArray(Float.floatToIntBits(number));
+  }
+
+  public static byte[] longToByteArray(long number) {
+    return Longs.toByteArray(number);
+  }
+
+  public static byte[] doubleToByteArray(double number) {
+    return Longs.toByteArray(Double.doubleToLongBits(number));
+  }
+
+  public static byte[] booleanToByteArray(boolean condition) {
+    return condition ? new byte[] {1} : new byte[] {0};
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BeamRowToBigtableMutationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BeamRowToBigtableMutationTest.java
index 2583ce3..5fa16db 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BeamRowToBigtableMutationTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BeamRowToBigtableMutationTest.java
@@ -18,13 +18,13 @@
 package org.apache.beam.sdk.io.gcp.bigtable;
 
 import static java.util.stream.Collectors.toList;
-import static org.apache.beam.sdk.io.gcp.bigtable.TestUtils.rowMutation;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.BOOL_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.DOUBLE_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.FAMILY_TEST;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.LONG_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.STRING_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.TEST_FLAT_SCHEMA;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.BOOL_COLUMN;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.DOUBLE_COLUMN;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.FAMILY_TEST;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.LONG_COLUMN;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.STRING_COLUMN;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.TEST_FLAT_SCHEMA;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.rowMutation;
 
 import com.google.bigtable.v2.Mutation;
 import com.google.protobuf.ByteString;
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableRowToBeamRowFlatTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableRowToBeamRowFlatTest.java
index f201b3a..ef59c6a 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableRowToBeamRowFlatTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableRowToBeamRowFlatTest.java
@@ -17,13 +17,13 @@
  */
 package org.apache.beam.sdk.io.gcp.bigtable;
 
-import static org.apache.beam.sdk.io.gcp.bigtable.TestUtils.bigtableRow;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.BOOL_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.DOUBLE_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.FAMILY_TEST;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.LONG_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.STRING_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.TEST_FLAT_SCHEMA;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.BOOL_COLUMN;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.DOUBLE_COLUMN;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.FAMILY_TEST;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.LONG_COLUMN;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.STRING_COLUMN;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.TEST_FLAT_SCHEMA;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.bigtableRow;
 
 import java.util.Map;
 import java.util.Set;
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableRowToBeamRowTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableRowToBeamRowTest.java
index 0a683ed..a7b9f6f 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableRowToBeamRowTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableRowToBeamRowTest.java
@@ -18,13 +18,13 @@
 package org.apache.beam.sdk.io.gcp.bigtable;
 
 import static java.util.stream.Collectors.toList;
-import static org.apache.beam.sdk.io.gcp.bigtable.TestUtils.bigtableRow;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.FAMILY_TEST;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.LATER;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.LONG_COLUMN_SCHEMA;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.STRING_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.TEST_FAMILY_SCHEMA;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.TEST_SCHEMA;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.FAMILY_TEST;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.LATER;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.LONG_COLUMN_SCHEMA;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.STRING_COLUMN;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.TEST_FAMILY_SCHEMA;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.TEST_SCHEMA;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.bigtableRow;
 
 import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
 import org.apache.beam.sdk.testing.PAssert;
@@ -71,8 +71,7 @@ public class BigtableRowToBeamRowTest {
             false,
             Row.withSchema(LONG_COLUMN_SCHEMA).attachValues(2L, LATER, ImmutableList.of("label1")),
             ImmutableList.of("value1", "value2"),
-            5.5,
-            new byte[] {2, 1, 0});
+            5.5);
   }
 
   private static class SortStringColumn extends SimpleFunction<Row, Row> {
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/TestUtils.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestUtils.java
similarity index 71%
rename from sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/TestUtils.java
rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestUtils.java
index d7345cb..5c5af10 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/TestUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestUtils.java
@@ -18,11 +18,12 @@
 package org.apache.beam.sdk.io.gcp.bigtable;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.FAMILY_TEST;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.LATER;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.NOW;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.booleanToByteArray;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.doubleToByteArray;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.KEY;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.LABELS;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.TIMESTAMP_MICROS;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.VALUE;
+import static org.apache.beam.sdk.io.gcp.testing.BigtableUtils.booleanToByteArray;
+import static org.apache.beam.sdk.io.gcp.testing.BigtableUtils.doubleToByteArray;
 
 import com.google.bigtable.v2.Cell;
 import com.google.bigtable.v2.Column;
@@ -30,11 +31,48 @@ import com.google.bigtable.v2.Family;
 import com.google.bigtable.v2.Mutation;
 import com.google.protobuf.ByteString;
 import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
 
-public class TestUtils {
+class BigtableTestUtils {
+
+  static final String BOOL_COLUMN = "boolColumn";
+  static final String LONG_COLUMN = "longColumn";
+  static final String STRING_COLUMN = "stringColumn";
+  static final String DOUBLE_COLUMN = "doubleColumn";
+  static final String FAMILY_TEST = "familyTest";
+
+  static final Schema LONG_COLUMN_SCHEMA =
+      Schema.builder()
+          .addInt64Field(VALUE)
+          .addInt64Field(TIMESTAMP_MICROS)
+          .addArrayField(LABELS, Schema.FieldType.STRING)
+          .build();
+
+  static final Schema TEST_FAMILY_SCHEMA =
+      Schema.builder()
+          .addBooleanField(BOOL_COLUMN)
+          .addRowField(LONG_COLUMN, LONG_COLUMN_SCHEMA)
+          .addArrayField(STRING_COLUMN, Schema.FieldType.STRING)
+          .addDoubleField(DOUBLE_COLUMN)
+          .build();
+
+  static final Schema TEST_SCHEMA =
+      Schema.builder().addStringField(KEY).addRowField(FAMILY_TEST, TEST_FAMILY_SCHEMA).build();
+
+  static final Schema TEST_FLAT_SCHEMA =
+      Schema.builder()
+          .addStringField(KEY)
+          .addBooleanField(BOOL_COLUMN)
+          .addInt64Field(LONG_COLUMN)
+          .addStringField(STRING_COLUMN)
+          .addDoubleField(DOUBLE_COLUMN)
+          .build();
+
+  static final long NOW = 5_000_000_000L;
+  static final long LATER = NOW + 1_000L;
 
   static com.google.bigtable.v2.Row bigtableRow(long i) {
     return com.google.bigtable.v2.Row.newBuilder()
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/CellValueParserTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/CellValueParserTest.java
index fccb20d..4bf776b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/CellValueParserTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/CellValueParserTest.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.io.gcp.bigtable;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.checkMessage;
 import static org.apache.beam.sdk.schemas.Schema.TypeName.BOOLEAN;
 import static org.apache.beam.sdk.schemas.Schema.TypeName.BYTE;
 import static org.apache.beam.sdk.schemas.Schema.TypeName.BYTES;
@@ -30,11 +29,15 @@ import static org.apache.beam.sdk.schemas.Schema.TypeName.INT32;
 import static org.apache.beam.sdk.schemas.Schema.TypeName.INT64;
 import static org.apache.beam.sdk.schemas.Schema.TypeName.MAP;
 import static org.apache.beam.sdk.schemas.Schema.TypeName.STRING;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.fail;
 
 import com.google.bigtable.v2.Cell;
 import com.google.protobuf.ByteString;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.junit.Test;
@@ -274,4 +277,12 @@ public class CellValueParserTest {
   private Cell cell(byte[] value) {
     return Cell.newBuilder().setValue(ByteString.copyFrom(value)).build();
   }
+
+  private void checkMessage(@Nullable String message, String substring) {
+    if (message != null) {
+      assertThat(message, containsString(substring));
+    } else {
+      fail();
+    }
+  }
 }