You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2020/12/16 21:32:21 UTC
[beam] branch master updated: [BEAM-11407] Add IT test to Bigtable
for BeamSQL
This is an automated email from the ASF dual-hosted git repository.
amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3cacd4a [BEAM-11407] Add IT test to Bigtable for BeamSQL
new 1a5505b Merge pull request #13512 from piotr-szuberski/bigtable-it
3cacd4a is described below
commit 3cacd4a1acaa5e37b326970aa47a89ee025ad1b4
Author: Piotr Szuberski <pi...@polidea.com>
AuthorDate: Mon Dec 14 09:21:51 2020 +0100
[BEAM-11407] Add IT test to Bigtable for BeamSQL
---
.../provider/bigtable/BigtableClientWrapper.java | 115 ++++++++++
.../BigtableTableCreationFailuresTest.java | 2 +-
.../provider/bigtable/BigtableTableFlatTest.java | 110 +++++-----
.../meta/provider/bigtable/BigtableTableIT.java | 200 +++++++++++++++++
.../meta/provider/bigtable/BigtableTableTest.java | 94 --------
.../provider/bigtable/BigtableTableTestUtils.java | 237 +++++++++++++++++++++
.../bigtable/BigtableTableWithRowsTest.java | 120 +++++------
.../io/gcp/testing/BigtableEmulatorWrapper.java | 75 -------
.../beam/sdk/io/gcp/testing/BigtableTestUtils.java | 153 -------------
.../beam/sdk/io/gcp/testing/BigtableUtils.java | 49 +++++
.../bigtable/BeamRowToBigtableMutationTest.java | 14 +-
.../gcp/bigtable/BigtableRowToBeamRowFlatTest.java | 14 +-
.../io/gcp/bigtable/BigtableRowToBeamRowTest.java | 17 +-
.../{TestUtils.java => BigtableTestUtils.java} | 50 ++++-
.../sdk/io/gcp/bigtable/CellValueParserTest.java | 13 +-
15 files changed, 786 insertions(+), 477 deletions(-)
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableClientWrapper.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableClientWrapper.java
new file mode 100644
index 0000000..6a8b343
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableClientWrapper.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.bigtable;
+
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.byteString;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.byteStringUtf8;
+
+import com.google.auth.Credentials;
+import com.google.bigtable.admin.v2.ColumnFamily;
+import com.google.bigtable.admin.v2.DeleteTableRequest;
+import com.google.bigtable.admin.v2.Table;
+import com.google.bigtable.v2.MutateRowRequest;
+import com.google.bigtable.v2.Mutation;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.CredentialOptions;
+import com.google.cloud.bigtable.grpc.BigtableDataClient;
+import com.google.cloud.bigtable.grpc.BigtableSession;
+import com.google.cloud.bigtable.grpc.BigtableTableAdminClient;
+import java.io.IOException;
+import java.io.Serializable;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+class BigtableClientWrapper implements Serializable {
+ private final BigtableTableAdminClient tableAdminClient;
+ private final BigtableDataClient dataClient;
+ private final BigtableSession session;
+ private final BigtableOptions bigtableOptions;
+
+ BigtableClientWrapper(
+ String project,
+ String instanceId,
+ @Nullable Integer emulatorPort,
+ @Nullable Credentials gcpCredentials)
+ throws IOException {
+ BigtableOptions.Builder optionsBuilder =
+ BigtableOptions.builder()
+ .setProjectId(project)
+ .setInstanceId(instanceId)
+ .setUserAgent("apache-beam-test");
+ if (emulatorPort != null) {
+ optionsBuilder.enableEmulator("localhost", emulatorPort);
+ }
+ if (gcpCredentials != null) {
+ optionsBuilder.setCredentialOptions(CredentialOptions.credential(gcpCredentials));
+ }
+ bigtableOptions = optionsBuilder.build();
+
+ session = new BigtableSession(bigtableOptions);
+ tableAdminClient = session.getTableAdminClient();
+ dataClient = session.getDataClient();
+ }
+
+ void writeRow(
+ String key,
+ String table,
+ String familyColumn,
+ String columnQualifier,
+ byte[] value,
+ long timestampMicros) {
+ Mutation.SetCell setCell =
+ Mutation.SetCell.newBuilder()
+ .setFamilyName(familyColumn)
+ .setColumnQualifier(byteStringUtf8(columnQualifier))
+ .setValue(byteString(value))
+ .setTimestampMicros(timestampMicros)
+ .build();
+ Mutation mutation = Mutation.newBuilder().setSetCell(setCell).build();
+ MutateRowRequest mutateRowRequest =
+ MutateRowRequest.newBuilder()
+ .setRowKey(byteStringUtf8(key))
+ .setTableName(bigtableOptions.getInstanceName().toTableNameStr(table))
+ .addMutations(mutation)
+ .build();
+ dataClient.mutateRow(mutateRowRequest);
+ }
+
+ void createTable(String tableName, String familyName) {
+ Table.Builder tableBuilder = Table.newBuilder();
+ tableBuilder.putColumnFamilies(familyName, ColumnFamily.newBuilder().build());
+
+ String instanceName = bigtableOptions.getInstanceName().toString();
+ com.google.bigtable.admin.v2.CreateTableRequest.Builder createTableRequestBuilder =
+ com.google.bigtable.admin.v2.CreateTableRequest.newBuilder()
+ .setParent(instanceName)
+ .setTableId(tableName)
+ .setTable(tableBuilder.build());
+ tableAdminClient.createTable(createTableRequestBuilder.build());
+ }
+
+ void deleteTable(String tableId) {
+ final String tableName = bigtableOptions.getInstanceName().toTableNameStr(tableId);
+ DeleteTableRequest.Builder deleteTableRequestBuilder =
+ DeleteTableRequest.newBuilder().setName(tableName);
+ tableAdminClient.deleteTable(deleteTableRequestBuilder.build());
+ }
+
+ void closeSession() throws IOException {
+ session.close();
+ }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableCreationFailuresTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableCreationFailuresTest.java
index 3f6f0a2..04cb0e1 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableCreationFailuresTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableCreationFailuresTest.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.bigtable;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.checkMessage;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.checkMessage;
import static org.junit.Assert.assertThrows;
import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java
index b336e43..9ea0ac8 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableFlatTest.java
@@ -17,13 +17,18 @@
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.bigtable;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.FAMILY_TEST;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.KEY1;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.KEY2;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.TEST_FLAT_SCHEMA;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.bigTableRow;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.columnsMappingString;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.createFlatTableString;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.createReadTable;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.flatRow;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.setFixedTimestamp;
import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.COLUMNS_MAPPING;
import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.KEY;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.KEY1;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.KEY2;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.TEST_FLAT_SCHEMA;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.bigTableRow;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.setFixedTimestamp;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
@@ -31,6 +36,8 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import com.alibaba.fastjson.JSONObject;
+import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule;
+import java.io.IOException;
import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
@@ -44,45 +51,49 @@ import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
-public class BigtableTableFlatTest extends BigtableTableTest {
+public class BigtableTableFlatTest {
+ @ClassRule
+ public static final BigtableEmulatorRule BIGTABLE_EMULATOR = BigtableEmulatorRule.create();
+
+ @Rule public TestPipeline readPipeline = TestPipeline.create();
@Rule public TestPipeline writePipeline = TestPipeline.create();
- private String createFlatTableString(String table) {
- return "CREATE EXTERNAL TABLE "
- + table
- + "( \n"
- + " key VARCHAR NOT NULL, \n"
- + " boolColumn BOOLEAN NOT NULL, \n"
- + " longColumn BIGINT NOT NULL, \n"
- + " stringColumn VARCHAR NOT NULL, \n"
- + " doubleColumn DOUBLE NOT NULL \n"
- + ") \n"
- + "TYPE bigtable \n"
- + "LOCATION '"
- + getLocation(table)
- + "' \n"
- + "TBLPROPERTIES '{ \n"
- + " \"columnsMapping\": \""
- + columnsMappingString()
- + "\"}'";
+ private static BigtableClientWrapper emulatorWrapper;
+
+ private static final String PROJECT = "fakeProject";
+ private static final String INSTANCE = "fakeInstance";
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ emulatorWrapper =
+ new BigtableClientWrapper(PROJECT, INSTANCE, BIGTABLE_EMULATOR.getPort(), null);
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ emulatorWrapper.closeSession();
}
@Test
public void testCreatesFlatSchemaCorrectly() {
+ final String tableId = "flatTableSchema";
InMemoryMetaStore metaStore = new InMemoryMetaStore();
metaStore.registerProvider(new BigtableTableProvider());
BeamSqlCli cli = new BeamSqlCli().metaStore(metaStore);
- cli.execute(createFlatTableString("flatTable"));
+ cli.execute(createFlatTableString(tableId, location(tableId)));
- Table table = metaStore.getTables().get("flatTable");
+ Table table = metaStore.getTables().get(tableId);
assertNotNull(table);
assertEquals(TEST_FLAT_SCHEMA, table.getSchema());
@@ -92,19 +103,13 @@ public class BigtableTableFlatTest extends BigtableTableTest {
}
@Test
- public void testSimpleSelectFlat() throws Exception {
- createReadTable("flatTable");
+ public void testSimpleSelectFlat() {
+ final String tableId = "flatTable";
+ createReadTable(tableId, emulatorWrapper);
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new BigtableTableProvider());
- sqlEnv.executeDdl(createFlatTableString("flatTable"));
+ sqlEnv.executeDdl(createFlatTableString(tableId, location(tableId)));
- String query =
- "SELECT \n"
- + " key, \n"
- + " boolColumn, \n"
- + " longColumn, \n"
- + " stringColumn, \n"
- + " doubleColumn \n"
- + "FROM flatTable";
+ String query = "SELECT key, boolColumn, longColumn, stringColumn, doubleColumn FROM flatTable";
sqlEnv.parseQuery(query);
PCollection<Row> queryOutput =
@@ -112,15 +117,16 @@ public class BigtableTableFlatTest extends BigtableTableTest {
assertThat(queryOutput.getSchema(), equalTo(TEST_FLAT_SCHEMA));
- PAssert.that(queryOutput).containsInAnyOrder(row(KEY1), row(KEY2));
+ PAssert.that(queryOutput).containsInAnyOrder(flatRow(KEY1), flatRow(KEY2));
readPipeline.run().waitUntilFinish();
}
@Test
- public void testSelectFlatKeyRegexQuery() throws Exception {
- createReadTable("regexTable");
+ public void testSelectFlatKeyRegexQuery() {
+ final String tableId = "regexTable";
+ createReadTable(tableId, emulatorWrapper);
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new BigtableTableProvider());
- sqlEnv.executeDdl(createFlatTableString("regexTable"));
+ sqlEnv.executeDdl(createFlatTableString(tableId, location(tableId)));
String query = "SELECT key FROM regexTable WHERE key LIKE '^key[0134]{1}'";
@@ -136,9 +142,10 @@ public class BigtableTableFlatTest extends BigtableTableTest {
@Test
public void testSimpleInsert() {
- createTable("beamWriteTable");
+ final String tableId = "beamWriteTable";
+ emulatorWrapper.createTable(tableId, FAMILY_TEST);
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new BigtableTableProvider());
- sqlEnv.executeDdl(createFlatTableString("beamWriteTable"));
+ sqlEnv.executeDdl(createFlatTableString(tableId, location(tableId)));
String query =
"INSERT INTO beamWriteTable(key, boolColumn, longColumn, stringColumn, doubleColumn) "
@@ -149,13 +156,17 @@ public class BigtableTableFlatTest extends BigtableTableTest {
PCollection<com.google.bigtable.v2.Row> bigTableRows =
readPipeline
- .apply(readTransform("beamWriteTable"))
+ .apply(readTransform(tableId))
.apply(MapElements.via(new ReplaceCellTimestamp()));
PAssert.that(bigTableRows).containsInAnyOrder(bigTableRow());
readPipeline.run().waitUntilFinish();
}
+ private String location(String tableId) {
+ return BigtableTableTestUtils.location(PROJECT, INSTANCE, tableId, BIGTABLE_EMULATOR.getPort());
+ }
+
private Schema filterSchema() {
return Schema.builder().addStringField(KEY).build();
}
@@ -172,16 +183,7 @@ public class BigtableTableFlatTest extends BigtableTableTest {
}
}
- private String columnsMappingString() {
- return "familyTest:boolColumn,familyTest:longColumn,familyTest:doubleColumn,"
- + "familyTest:stringColumn";
- }
-
- private static Row row(String key) {
- return Row.withSchema(TEST_FLAT_SCHEMA).attachValues(key, false, 2L, "string2", 2.20);
- }
-
- private static BigtableIO.Read readTransform(String table) {
+ private BigtableIO.Read readTransform(String table) {
return BigtableIO.read()
.withProjectId("fakeProject")
.withInstanceId("fakeInstance")
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableIT.java
new file mode 100644
index 0000000..5b86e15
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableIT.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.bigtable;
+
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.FAMILY_TEST;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.NOW;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.TEST_FLAT_SCHEMA;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.createFlatTableString;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.createFullTableString;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.expectedFullSchema;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.TIMESTAMP_MICROS;
+
+import com.google.auth.Credentials;
+import com.google.cloud.bigtable.emulator.v2.Emulator;
+import java.util.UUID;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class BigtableTableIT {
+ private static BigtableTestOptions options;
+ private static BigtableClientWrapper clientWrapper;
+ private static final String TABLE_ID = "Beam" + UUID.randomUUID();
+ private static Emulator emulator;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ PipelineOptionsFactory.register(BigtableTestOptions.class);
+ options = TestPipeline.testingPipelineOptions().as(BigtableTestOptions.class);
+
+ if (options.isWithEmulator()) {
+ emulator = Emulator.createBundled();
+ emulator.start();
+ }
+ Credentials credentials =
+ options.isWithEmulator() ? null : options.as(GcpOptions.class).getGcpCredential();
+ Integer emulatorPort = options.isWithEmulator() ? emulator.getPort() : null;
+
+ clientWrapper =
+ new BigtableClientWrapper(
+ options.getBigtableProject(), options.getInstanceId(), emulatorPort, credentials);
+
+ clientWrapper.createTable(TABLE_ID, FAMILY_TEST);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ clientWrapper.deleteTable(TABLE_ID);
+ clientWrapper.closeSession();
+ if (emulator != null) {
+ emulator.stop();
+ }
+ }
+
+ @Test
+ public void testWriteThenRead() {
+ writeData();
+ readFlatData();
+ readData();
+ }
+
+ private void writeData() {
+ Pipeline p = Pipeline.create(options);
+ BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new BigtableTableProvider());
+ sqlEnv.executeDdl(createFlatTableString(TABLE_ID, location()));
+
+ String query =
+ String.format(
+ "INSERT INTO `%s`(key, boolColumn, longColumn, stringColumn, doubleColumn) "
+ + "VALUES ('key1', FALSE, 1, 'string1', 1.0)",
+ TABLE_ID);
+
+ BeamSqlRelUtils.toPCollection(p, sqlEnv.parseQuery(query));
+ p.run().waitUntilFinish();
+ }
+
+ private void readFlatData() {
+ Pipeline p = Pipeline.create(options);
+ BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new BigtableTableProvider());
+ sqlEnv.executeDdl(createFlatTableString(TABLE_ID, location()));
+ String query = "SELECT * FROM `" + TABLE_ID + "`";
+
+ PCollection<Row> flatRows = BeamSqlRelUtils.toPCollection(p, sqlEnv.parseQuery(query));
+
+ PAssert.that(flatRows).containsInAnyOrder(expectedFlatRow(1));
+ p.run().waitUntilFinish();
+ }
+
+ private void readData() {
+ Pipeline p = Pipeline.create(options);
+ BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new BigtableTableProvider());
+ sqlEnv.executeDdl(createFullTableString(TABLE_ID, location()));
+ String query =
+ String.format(
+ "SELECT key, "
+ + " t.familyTest.boolColumn, "
+ + " t.familyTest.longColumn.val AS longValue, "
+ + " t.familyTest.longColumn.timestampMicros, "
+ + " t.familyTest.longColumn.labels, "
+ + " t.familyTest.stringColumn, "
+ + " t.familyTest.doubleColumn "
+ + "FROM `%s` t",
+ TABLE_ID);
+
+ PCollection<Row> rows =
+ BeamSqlRelUtils.toPCollection(p, sqlEnv.parseQuery(query))
+ .apply(MapElements.via(new ReplaceTimestamp()))
+ .setRowSchema(expectedFullSchema());
+
+ PAssert.that(rows).containsInAnyOrder(expectedFullRow(1));
+ p.run().waitUntilFinish();
+ }
+
+ private Row expectedFullRow(int i) {
+ return Row.withSchema(expectedFullSchema())
+ .attachValues(
+ "key" + i,
+ i % 2 == 0,
+ (long) i,
+ NOW,
+ ImmutableList.of(),
+ ImmutableList.of("string" + i),
+ (double) i);
+ }
+
+ private Row expectedFlatRow(int i) {
+ return Row.withSchema(TEST_FLAT_SCHEMA)
+ .attachValues("key" + i, i % 2 == 0, (long) i, "string" + i, (double) i);
+ }
+
+ private static class ReplaceTimestamp extends SimpleFunction<Row, Row> {
+ @Override
+ public Row apply(Row input) {
+ return Row.fromRow(input).withFieldValue(TIMESTAMP_MICROS, NOW).build();
+ }
+ }
+
+ private String location() {
+ Integer emulatorPort = options.isWithEmulator() ? emulator.getPort() : null;
+ return BigtableTableTestUtils.location(
+ options.getBigtableProject(), options.getInstanceId(), TABLE_ID, emulatorPort);
+ }
+
+ /** Properties needed when using Bigtable with the Beam SDK. */
+ public interface BigtableTestOptions extends TestPipelineOptions {
+ @Description("Instance ID for Bigtable")
+ @Default.String("fakeInstance")
+ String getInstanceId();
+
+ void setInstanceId(String value);
+
+ @Description("Project for Bigtable")
+ @Default.String("fakeProject")
+ String getBigtableProject();
+
+ void setBigtableProject(String value);
+
+ @Description("Whether to use emulator")
+ @Default.Boolean(true)
+ Boolean isWithEmulator();
+
+ void setWithEmulator(Boolean value);
+ }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTest.java
deleted file mode 100644
index 94d1df2..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.meta.provider.bigtable;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.BINARY_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.BOOL_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.DOUBLE_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.FAMILY_TEST;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.KEY1;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.KEY2;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.LATER;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.LONG_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.NOW;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.STRING_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.booleanToByteArray;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.doubleToByteArray;
-
-import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule;
-import org.apache.beam.sdk.io.gcp.testing.BigtableEmulatorWrapper;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-
-@SuppressWarnings({
- "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-public abstract class BigtableTableTest {
-
- @ClassRule
- public static final BigtableEmulatorRule BIGTABLE_EMULATOR = BigtableEmulatorRule.create();
-
- @Rule public transient TestPipeline readPipeline = TestPipeline.create();
-
- private static BigtableEmulatorWrapper emulatorWrapper;
-
- @BeforeClass
- public static void setUp() throws Exception {
- emulatorWrapper =
- new BigtableEmulatorWrapper(BIGTABLE_EMULATOR.getPort(), "fakeProject", "fakeInstance");
- }
-
- protected static void createTable(String table) {
- emulatorWrapper.createTable(table, FAMILY_TEST);
- }
-
- protected static void createReadTable(String table) throws Exception {
- createTable(table);
- writeRow(KEY1, table);
- writeRow(KEY2, table);
- }
-
- protected static String getLocation(String table) {
- return String.format(
- "localhost:%s/bigtable/projects/fakeProject/instances/fakeInstance/tables/%s",
- BIGTABLE_EMULATOR.getPort(), table);
- }
-
- private static void writeRow(String key, String table) throws Exception {
- emulatorWrapper.writeRow(key, table, FAMILY_TEST, BOOL_COLUMN, booleanToByteArray(true), NOW);
- emulatorWrapper.writeRow(
- key, table, FAMILY_TEST, BOOL_COLUMN, booleanToByteArray(false), LATER);
- emulatorWrapper.writeRow(
- key, table, FAMILY_TEST, STRING_COLUMN, "string1".getBytes(UTF_8), NOW);
- emulatorWrapper.writeRow(
- key, table, FAMILY_TEST, STRING_COLUMN, "string2".getBytes(UTF_8), LATER);
- emulatorWrapper.writeRow(key, table, FAMILY_TEST, LONG_COLUMN, Longs.toByteArray(1L), NOW);
- emulatorWrapper.writeRow(key, table, FAMILY_TEST, LONG_COLUMN, Longs.toByteArray(2L), LATER);
- emulatorWrapper.writeRow(key, table, FAMILY_TEST, DOUBLE_COLUMN, doubleToByteArray(1.10), NOW);
- emulatorWrapper.writeRow(
- key, table, FAMILY_TEST, DOUBLE_COLUMN, doubleToByteArray(2.20), LATER);
- emulatorWrapper.writeRow(
- key, table, FAMILY_TEST, BINARY_COLUMN, "blob1".getBytes(UTF_8), LATER);
- emulatorWrapper.writeRow(
- key, table, FAMILY_TEST, BINARY_COLUMN, "blob2".getBytes(UTF_8), LATER);
- }
-}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTestUtils.java
new file mode 100644
index 0000000..4c466fa
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableTestUtils.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.bigtable;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.toList;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.KEY;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.LABELS;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.TIMESTAMP_MICROS;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.VALUE;
+import static org.apache.beam.sdk.io.gcp.testing.BigtableUtils.booleanToByteArray;
+import static org.apache.beam.sdk.io.gcp.testing.BigtableUtils.byteString;
+import static org.apache.beam.sdk.io.gcp.testing.BigtableUtils.byteStringUtf8;
+import static org.apache.beam.sdk.io.gcp.testing.BigtableUtils.doubleToByteArray;
+import static org.apache.beam.sdk.io.gcp.testing.BigtableUtils.longToByteArray;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.fail;
+
+import com.google.bigtable.v2.Cell;
+import com.google.bigtable.v2.Column;
+import com.google.bigtable.v2.Family;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+class BigtableTableTestUtils {
+
+ static final String KEY1 = "key1";
+ static final String KEY2 = "key2";
+
+ static final String BOOL_COLUMN = "boolColumn";
+ static final String LONG_COLUMN = "longColumn";
+ static final String STRING_COLUMN = "stringColumn";
+ static final String DOUBLE_COLUMN = "doubleColumn";
+ static final String FAMILY_TEST = "familyTest";
+
+ static final Schema LONG_COLUMN_SCHEMA =
+ Schema.builder()
+ .addInt64Field(VALUE)
+ .addInt64Field(TIMESTAMP_MICROS)
+ .addArrayField(LABELS, Schema.FieldType.STRING)
+ .build();
+
+ static final Schema TEST_FAMILY_SCHEMA =
+ Schema.builder()
+ .addBooleanField(BOOL_COLUMN)
+ .addRowField(LONG_COLUMN, LONG_COLUMN_SCHEMA)
+ .addArrayField(STRING_COLUMN, Schema.FieldType.STRING)
+ .addDoubleField(DOUBLE_COLUMN)
+ .build();
+
+ static final Schema TEST_SCHEMA =
+ Schema.builder().addStringField(KEY).addRowField(FAMILY_TEST, TEST_FAMILY_SCHEMA).build();
+
+ static final Schema TEST_FLAT_SCHEMA =
+ Schema.builder()
+ .addStringField(KEY)
+ .addBooleanField(BOOL_COLUMN)
+ .addInt64Field(LONG_COLUMN)
+ .addStringField(STRING_COLUMN)
+ .addDoubleField(DOUBLE_COLUMN)
+ .build();
+
+ static final long NOW = 5_000_000_000L;
+ static final long LATER = NOW + 1_000L;
+
+ static String createFlatTableString(String table, String location) {
+ return String.format(
+ "CREATE EXTERNAL TABLE `%s`( \n"
+ + " key VARCHAR NOT NULL, \n"
+ + " boolColumn BOOLEAN NOT NULL, \n"
+ + " longColumn BIGINT NOT NULL, \n"
+ + " stringColumn VARCHAR NOT NULL, \n"
+ + " doubleColumn DOUBLE NOT NULL \n"
+ + ") \n"
+ + "TYPE bigtable \n"
+ + "LOCATION '%s' \n"
+ + "TBLPROPERTIES '{ \n"
+ + " \"columnsMapping\": \"%s\"}'",
+ table, location, columnsMappingString());
+ }
+
+ static String createFullTableString(String tableId, String location) {
+ return String.format(
+ "CREATE EXTERNAL TABLE `%s`( \n"
+ + " key VARCHAR NOT NULL, \n"
+ + " familyTest ROW< \n"
+ + " boolColumn BOOLEAN NOT NULL, \n"
+ + " longColumn ROW< \n"
+ + " val BIGINT NOT NULL, \n"
+ + " timestampMicros BIGINT NOT NULL, \n"
+ + " labels ARRAY<VARCHAR> NOT NULL \n"
+ + " > NOT NULL, \n"
+ + " stringColumn ARRAY<VARCHAR> NOT NULL, \n"
+ + " doubleColumn DOUBLE NOT NULL \n"
+ + " > NOT NULL \n"
+ + ") \n"
+ + "TYPE bigtable \n"
+ + "LOCATION '%s'",
+ tableId, location);
+ }
+
+ static Schema expectedFullSchema() {
+ return Schema.builder()
+ .addStringField(KEY)
+ .addBooleanField(BOOL_COLUMN)
+ .addInt64Field("longValue")
+ .addInt64Field(TIMESTAMP_MICROS)
+ .addArrayField(LABELS, Schema.FieldType.STRING)
+ .addArrayField(STRING_COLUMN, Schema.FieldType.STRING)
+ .addDoubleField(DOUBLE_COLUMN)
+ .build();
+ }
+
+ static Row expectedFullRow(String key) {
+ return Row.withSchema(expectedFullSchema())
+ .attachValues(
+ key,
+ false,
+ 2L,
+ LATER,
+ ImmutableList.of(),
+ ImmutableList.of("string1", "string2"),
+ 2.20);
+ }
+
+ static Row flatRow(String key) {
+ return Row.withSchema(TEST_FLAT_SCHEMA).attachValues(key, false, 2L, "string2", 2.20);
+ }
+
+ static String location(
+ String project, String instanceId, String tableId, @Nullable Integer emulatorPort) {
+ String host = emulatorPort == null ? "googleapis.com" : "localhost:" + emulatorPort;
+ return String.format(
+ "%s/bigtable/projects/%s/instances/%s/tables/%s", host, project, instanceId, tableId);
+ }
+
+ static String columnsMappingString() {
+ return "familyTest:boolColumn,familyTest:longColumn,familyTest:doubleColumn,"
+ + "familyTest:stringColumn";
+ }
+
+ static void createReadTable(String table, BigtableClientWrapper clientWrapper) {
+ clientWrapper.createTable(table, FAMILY_TEST);
+ writeRow(KEY1, table, clientWrapper);
+ writeRow(KEY2, table, clientWrapper);
+ }
+
+ static com.google.bigtable.v2.Row bigTableRow() {
+ List<Column> columns =
+ ImmutableList.of(
+ column("boolColumn", booleanToByteArray(true)),
+ column("doubleColumn", doubleToByteArray(5.5)),
+ column("longColumn", Longs.toByteArray(10L)),
+ column("stringColumn", "stringValue".getBytes(UTF_8)));
+ Family family = Family.newBuilder().setName("familyTest").addAllColumns(columns).build();
+ return com.google.bigtable.v2.Row.newBuilder()
+ .setKey(byteStringUtf8("key"))
+ .addFamilies(family)
+ .build();
+ }
+
+ // There is no possibility to insert a value with fixed timestamp so we have to replace it
+ // for the testing purpose.
+ static com.google.bigtable.v2.Row setFixedTimestamp(com.google.bigtable.v2.Row row) {
+ Family family = row.getFamilies(0);
+
+ List<Column> columnsReplaced =
+ family.getColumnsList().stream()
+ .map(
+ column -> {
+ Cell cell = column.getCells(0);
+ return column(
+ column.getQualifier().toStringUtf8(), cell.getValue().toByteArray());
+ })
+ .collect(toList());
+ Family familyReplaced =
+ Family.newBuilder().setName(family.getName()).addAllColumns(columnsReplaced).build();
+ return com.google.bigtable.v2.Row.newBuilder()
+ .setKey(row.getKey())
+ .addFamilies(familyReplaced)
+ .build();
+ }
+
+ static void checkMessage(@Nullable String message, String substring) {
+ if (message != null) {
+ assertThat(message, containsString(substring));
+ } else {
+ fail();
+ }
+ }
+
+ private static Column column(String qualifier, byte[] value) {
+ return Column.newBuilder()
+ .setQualifier(byteStringUtf8(qualifier))
+ .addCells(cell(value))
+ .build();
+ }
+
+ private static Cell cell(byte[] value) {
+ return Cell.newBuilder().setValue(byteString(value)).setTimestampMicros(NOW).build();
+ }
+
+ private static void writeRow(String key, String table, BigtableClientWrapper clientWrapper) {
+ clientWrapper.writeRow(key, table, FAMILY_TEST, BOOL_COLUMN, booleanToByteArray(true), NOW);
+ clientWrapper.writeRow(key, table, FAMILY_TEST, BOOL_COLUMN, booleanToByteArray(false), LATER);
+ clientWrapper.writeRow(key, table, FAMILY_TEST, STRING_COLUMN, "string1".getBytes(UTF_8), NOW);
+ clientWrapper.writeRow(
+ key, table, FAMILY_TEST, STRING_COLUMN, "string2".getBytes(UTF_8), LATER);
+ clientWrapper.writeRow(key, table, FAMILY_TEST, LONG_COLUMN, longToByteArray(1L), NOW);
+ clientWrapper.writeRow(key, table, FAMILY_TEST, LONG_COLUMN, longToByteArray(2L), LATER);
+ clientWrapper.writeRow(key, table, FAMILY_TEST, DOUBLE_COLUMN, doubleToByteArray(1.10), NOW);
+ clientWrapper.writeRow(key, table, FAMILY_TEST, DOUBLE_COLUMN, doubleToByteArray(2.20), LATER);
+ }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableWithRowsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableWithRowsTest.java
index 7b0f2b5..b3d02ef 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableWithRowsTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigtable/BigtableTableWithRowsTest.java
@@ -17,62 +17,65 @@
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.bigtable;
-import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.stream.Collectors.toList;
-import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.KEY;
-import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.LABELS;
-import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.TIMESTAMP_MICROS;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.BINARY_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.BOOL_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.DOUBLE_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.KEY1;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.KEY2;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.LATER;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.STRING_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.TEST_SCHEMA;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.KEY1;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.KEY2;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.STRING_COLUMN;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.TEST_SCHEMA;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.createFullTableString;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.createReadTable;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.expectedFullRow;
+import static org.apache.beam.sdk.extensions.sql.meta.provider.bigtable.BigtableTableTestUtils.expectedFullSchema;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule;
+import java.io.IOException;
import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
-import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
import org.junit.Test;
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
-public class BigtableTableWithRowsTest extends BigtableTableTest {
-
- private String createTableString() {
- return "CREATE EXTERNAL TABLE beamTable( \n"
- + " key VARCHAR NOT NULL, \n"
- + " familyTest ROW< \n"
- + " boolColumn BOOLEAN NOT NULL, \n"
- + " longColumn ROW< \n"
- + " val BIGINT NOT NULL, \n"
- + " timestampMicros BIGINT NOT NULL, \n"
- + " labels ARRAY<VARCHAR> NOT NULL \n"
- + " > NOT NULL, \n"
- + " stringColumn ARRAY<VARCHAR> NOT NULL, \n"
- + " doubleColumn DOUBLE NOT NULL, \n"
- + " binaryColumn BINARY NOT NULL \n"
- + " > NOT NULL \n"
- + ") \n"
- + "TYPE bigtable \n"
- + "LOCATION '"
- + getLocation("beamTable")
- + "'";
+public class BigtableTableWithRowsTest {
+
+ @ClassRule
+ public static final BigtableEmulatorRule BIGTABLE_EMULATOR = BigtableEmulatorRule.create();
+
+ @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+ private static BigtableClientWrapper emulatorWrapper;
+
+ private static final String PROJECT = "fakeProject";
+ private static final String INSTANCE = "fakeInstance";
+ private static final String TABLE = "beamTable";
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ emulatorWrapper =
+ new BigtableClientWrapper("fakeProject", "fakeInstance", BIGTABLE_EMULATOR.getPort(), null);
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ emulatorWrapper.closeSession();
}
@Test
@@ -81,7 +84,7 @@ public class BigtableTableWithRowsTest extends BigtableTableTest {
metaStore.registerProvider(new BigtableTableProvider());
BeamSqlCli cli = new BeamSqlCli().metaStore(metaStore);
- cli.execute(createTableString());
+ cli.execute(createFullTableString(TABLE, location()));
Table table = metaStore.getTables().get("beamTable");
assertNotNull(table);
@@ -89,59 +92,36 @@ public class BigtableTableWithRowsTest extends BigtableTableTest {
}
@Test
- public void testSimpleSelect() throws Exception {
- createReadTable("beamTable");
+ public void testSimpleSelect() {
+ createReadTable(TABLE, emulatorWrapper);
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new BigtableTableProvider());
- sqlEnv.executeDdl(createTableString());
+ sqlEnv.executeDdl(createFullTableString(TABLE, location()));
String query =
- ""
- + "SELECT key, \n"
+ "SELECT key, \n"
+ " bt.familyTest.boolColumn, \n"
+ " bt.familyTest.longColumn.val AS longValue, \n"
+ " bt.familyTest.longColumn.timestampMicros, \n"
+ " bt.familyTest.longColumn.labels, \n"
+ " bt.familyTest.stringColumn, \n"
- + " bt.familyTest.doubleColumn, \n"
- + " bt.familyTest.binaryColumn \n"
+ + " bt.familyTest.doubleColumn \n"
+ "FROM beamTable bt";
sqlEnv.parseQuery(query);
PCollection<Row> queryOutput =
BeamSqlRelUtils.toPCollection(readPipeline, sqlEnv.parseQuery(query));
- assertThat(queryOutput.getSchema(), equalTo(expectedSchema()));
+ assertThat(queryOutput.getSchema(), equalTo(expectedFullSchema()));
PCollection<Row> sorted =
- queryOutput.apply(MapElements.via(new SortByTimestamp())).setRowSchema(expectedSchema());
+ queryOutput
+ .apply(MapElements.via(new SortByTimestamp()))
+ .setRowSchema(expectedFullSchema());
- PAssert.that(sorted)
- .containsInAnyOrder(row(expectedSchema(), KEY1), row(expectedSchema(), KEY2));
+ PAssert.that(sorted).containsInAnyOrder(expectedFullRow(KEY1), expectedFullRow(KEY2));
readPipeline.run().waitUntilFinish();
}
- private static Schema expectedSchema() {
- return Schema.builder()
- .addStringField(KEY)
- .addBooleanField(BOOL_COLUMN)
- .addInt64Field("longValue")
- .addInt64Field(TIMESTAMP_MICROS)
- .addArrayField(LABELS, Schema.FieldType.STRING)
- .addArrayField(STRING_COLUMN, Schema.FieldType.STRING)
- .addDoubleField(DOUBLE_COLUMN)
- .addByteArrayField(BINARY_COLUMN)
- .build();
- }
-
- private static Row row(Schema schema, String key) {
- return Row.withSchema(schema)
- .attachValues(
- key,
- false,
- 2L,
- LATER,
- ImmutableList.of(),
- ImmutableList.of("string1", "string2"),
- 2.20,
- "blob2".getBytes(UTF_8));
+ private String location() {
+ return BigtableTableTestUtils.location(PROJECT, INSTANCE, TABLE, BIGTABLE_EMULATOR.getPort());
}
private static class SortByTimestamp extends SimpleFunction<Row, Row> {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigtableEmulatorWrapper.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigtableEmulatorWrapper.java
deleted file mode 100644
index 2a4b87e..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigtableEmulatorWrapper.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.testing;
-
-import com.google.api.core.ApiFuture;
-import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
-import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
-import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
-import com.google.cloud.bigtable.data.v2.BigtableDataClient;
-import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
-import com.google.cloud.bigtable.data.v2.models.RowMutation;
-import com.google.protobuf.ByteString;
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-
-public class BigtableEmulatorWrapper {
- private final BigtableTableAdminClient tableAdminClient;
- private final BigtableDataClient dataClient;
-
- public BigtableEmulatorWrapper(int emulatorPort, String projectId, String instanceId)
- throws IOException {
- BigtableTableAdminSettings.Builder tableAdminSettings =
- BigtableTableAdminSettings.newBuilderForEmulator(emulatorPort)
- .setProjectId(projectId)
- .setInstanceId(instanceId);
- tableAdminClient = BigtableTableAdminClient.create(tableAdminSettings.build());
-
- BigtableDataSettings.Builder dataSettings =
- BigtableDataSettings.newBuilderForEmulator(emulatorPort)
- .setProjectId(projectId)
- .setInstanceId(instanceId);
- dataClient = BigtableDataClient.create(dataSettings.build());
- }
-
- public void writeRow(
- String key,
- String table,
- String familyColumn,
- String columnQualifier,
- byte[] value,
- long timestampMicros)
- throws ExecutionException, InterruptedException {
- ApiFuture<Void> mutateFuture =
- dataClient.mutateRowAsync(
- RowMutation.create(table, key)
- .setCell(
- familyColumn,
- ByteString.copyFromUtf8(columnQualifier),
- timestampMicros,
- ByteString.copyFrom(value)));
- mutateFuture.get();
- }
-
- public void createTable(String tableName, String... families) {
- CreateTableRequest request = CreateTableRequest.of(tableName);
- ImmutableList.copyOf(families).forEach(request::addFamily);
- tableAdminClient.createTable(request);
- }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigtableTestUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigtableTestUtils.java
deleted file mode 100644
index d9796fd..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigtableTestUtils.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.testing;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.stream.Collectors.toList;
-import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.KEY;
-import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.LABELS;
-import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.TIMESTAMP_MICROS;
-import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.VALUE;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsString;
-import static org.junit.Assert.fail;
-
-import com.google.bigtable.v2.Cell;
-import com.google.bigtable.v2.Column;
-import com.google.bigtable.v2.Family;
-import com.google.protobuf.ByteString;
-import java.util.List;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
-import org.checkerframework.checker.nullness.qual.Nullable;
-
-@SuppressWarnings({
- "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-public class BigtableTestUtils {
-
- public static final String KEY1 = "key1";
- public static final String KEY2 = "key2";
-
- public static final String BOOL_COLUMN = "boolColumn";
- public static final String LONG_COLUMN = "longColumn";
- public static final String STRING_COLUMN = "stringColumn";
- public static final String DOUBLE_COLUMN = "doubleColumn";
- public static final String BINARY_COLUMN = "binaryColumn";
- public static final String FAMILY_TEST = "familyTest";
-
- public static final Schema LONG_COLUMN_SCHEMA =
- Schema.builder()
- .addInt64Field(VALUE)
- .addInt64Field(TIMESTAMP_MICROS)
- .addArrayField(LABELS, Schema.FieldType.STRING)
- .build();
-
- public static final Schema TEST_FAMILY_SCHEMA =
- Schema.builder()
- .addBooleanField(BOOL_COLUMN)
- .addRowField(LONG_COLUMN, LONG_COLUMN_SCHEMA)
- .addArrayField(STRING_COLUMN, Schema.FieldType.STRING)
- .addDoubleField(DOUBLE_COLUMN)
- .addByteArrayField(BINARY_COLUMN)
- .build();
-
- public static final Schema TEST_SCHEMA =
- Schema.builder().addStringField(KEY).addRowField(FAMILY_TEST, TEST_FAMILY_SCHEMA).build();
-
- public static final Schema TEST_FLAT_SCHEMA =
- Schema.builder()
- .addStringField(KEY)
- .addBooleanField(BOOL_COLUMN)
- .addInt64Field(LONG_COLUMN)
- .addStringField(STRING_COLUMN)
- .addDoubleField(DOUBLE_COLUMN)
- .build();
-
- public static final long NOW = 5_000_000_000L;
- public static final long LATER = NOW + 1_000L;
-
- public static byte[] floatToByteArray(float number) {
- return Ints.toByteArray(Float.floatToIntBits(number));
- }
-
- public static byte[] doubleToByteArray(double number) {
- return Longs.toByteArray(Double.doubleToLongBits(number));
- }
-
- public static byte[] booleanToByteArray(boolean condition) {
- return condition ? new byte[] {1} : new byte[] {0};
- }
-
- public static void checkMessage(@Nullable String message, String substring) {
- if (message != null) {
- assertThat(message, containsString(substring));
- } else {
- fail();
- }
- }
-
- public static com.google.bigtable.v2.Row bigTableRow() {
- List<Column> columns =
- ImmutableList.of(
- column("boolColumn", booleanToByteArray(true)),
- column("doubleColumn", doubleToByteArray(5.5)),
- column("longColumn", Longs.toByteArray(10L)),
- column("stringColumn", "stringValue".getBytes(UTF_8)));
- Family family = Family.newBuilder().setName("familyTest").addAllColumns(columns).build();
- return com.google.bigtable.v2.Row.newBuilder()
- .setKey(ByteString.copyFromUtf8("key"))
- .addFamilies(family)
- .build();
- }
-
- // There is no possibility to insert a value with fixed timestamp so we have to replace it
- // for the testing purpose.
- public static com.google.bigtable.v2.Row setFixedTimestamp(com.google.bigtable.v2.Row row) {
- Family family = row.getFamilies(0);
-
- List<Column> columnsReplaced =
- family.getColumnsList().stream()
- .map(
- column -> {
- Cell cell = column.getCells(0);
- return column(
- column.getQualifier().toStringUtf8(), cell.getValue().toByteArray());
- })
- .collect(toList());
- Family familyReplaced =
- Family.newBuilder().setName(family.getName()).addAllColumns(columnsReplaced).build();
- return com.google.bigtable.v2.Row.newBuilder()
- .setKey(row.getKey())
- .addFamilies(familyReplaced)
- .build();
- }
-
- private static Column column(String qualifier, byte[] value) {
- return Column.newBuilder()
- .setQualifier(ByteString.copyFromUtf8(qualifier))
- .addCells(cell(value))
- .build();
- }
-
- private static Cell cell(byte[] value) {
- return Cell.newBuilder().setValue(ByteString.copyFrom(value)).setTimestampMicros(NOW).build();
- }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigtableUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigtableUtils.java
new file mode 100644
index 0000000..67ed384
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigtableUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.testing;
+
+import com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
+
+public class BigtableUtils {
+
+ public static ByteString byteString(byte[] bytes) {
+ return ByteString.copyFrom(bytes);
+ }
+
+ public static ByteString byteStringUtf8(String s) {
+ return ByteString.copyFromUtf8(s);
+ }
+
+ public static byte[] floatToByteArray(float number) {
+ return Ints.toByteArray(Float.floatToIntBits(number));
+ }
+
+ public static byte[] longToByteArray(long number) {
+ return Longs.toByteArray(number);
+ }
+
+ public static byte[] doubleToByteArray(double number) {
+ return Longs.toByteArray(Double.doubleToLongBits(number));
+ }
+
+ public static byte[] booleanToByteArray(boolean condition) {
+ return condition ? new byte[] {1} : new byte[] {0};
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BeamRowToBigtableMutationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BeamRowToBigtableMutationTest.java
index 2583ce3..5fa16db 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BeamRowToBigtableMutationTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BeamRowToBigtableMutationTest.java
@@ -18,13 +18,13 @@
package org.apache.beam.sdk.io.gcp.bigtable;
import static java.util.stream.Collectors.toList;
-import static org.apache.beam.sdk.io.gcp.bigtable.TestUtils.rowMutation;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.BOOL_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.DOUBLE_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.FAMILY_TEST;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.LONG_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.STRING_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.TEST_FLAT_SCHEMA;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.BOOL_COLUMN;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.DOUBLE_COLUMN;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.FAMILY_TEST;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.LONG_COLUMN;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.STRING_COLUMN;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.TEST_FLAT_SCHEMA;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.rowMutation;
import com.google.bigtable.v2.Mutation;
import com.google.protobuf.ByteString;
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableRowToBeamRowFlatTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableRowToBeamRowFlatTest.java
index f201b3a..ef59c6a 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableRowToBeamRowFlatTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableRowToBeamRowFlatTest.java
@@ -17,13 +17,13 @@
*/
package org.apache.beam.sdk.io.gcp.bigtable;
-import static org.apache.beam.sdk.io.gcp.bigtable.TestUtils.bigtableRow;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.BOOL_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.DOUBLE_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.FAMILY_TEST;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.LONG_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.STRING_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.TEST_FLAT_SCHEMA;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.BOOL_COLUMN;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.DOUBLE_COLUMN;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.FAMILY_TEST;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.LONG_COLUMN;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.STRING_COLUMN;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.TEST_FLAT_SCHEMA;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.bigtableRow;
import java.util.Map;
import java.util.Set;
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableRowToBeamRowTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableRowToBeamRowTest.java
index 0a683ed..a7b9f6f 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableRowToBeamRowTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableRowToBeamRowTest.java
@@ -18,13 +18,13 @@
package org.apache.beam.sdk.io.gcp.bigtable;
import static java.util.stream.Collectors.toList;
-import static org.apache.beam.sdk.io.gcp.bigtable.TestUtils.bigtableRow;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.FAMILY_TEST;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.LATER;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.LONG_COLUMN_SCHEMA;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.STRING_COLUMN;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.TEST_FAMILY_SCHEMA;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.TEST_SCHEMA;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.FAMILY_TEST;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.LATER;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.LONG_COLUMN_SCHEMA;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.STRING_COLUMN;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.TEST_FAMILY_SCHEMA;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.TEST_SCHEMA;
+import static org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils.bigtableRow;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.testing.PAssert;
@@ -71,8 +71,7 @@ public class BigtableRowToBeamRowTest {
false,
Row.withSchema(LONG_COLUMN_SCHEMA).attachValues(2L, LATER, ImmutableList.of("label1")),
ImmutableList.of("value1", "value2"),
- 5.5,
- new byte[] {2, 1, 0});
+ 5.5);
}
private static class SortStringColumn extends SimpleFunction<Row, Row> {
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/TestUtils.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestUtils.java
similarity index 71%
rename from sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/TestUtils.java
rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestUtils.java
index d7345cb..5c5af10 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/TestUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestUtils.java
@@ -18,11 +18,12 @@
package org.apache.beam.sdk.io.gcp.bigtable;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.FAMILY_TEST;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.LATER;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.NOW;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.booleanToByteArray;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.doubleToByteArray;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.KEY;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.LABELS;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.TIMESTAMP_MICROS;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.VALUE;
+import static org.apache.beam.sdk.io.gcp.testing.BigtableUtils.booleanToByteArray;
+import static org.apache.beam.sdk.io.gcp.testing.BigtableUtils.doubleToByteArray;
import com.google.bigtable.v2.Cell;
import com.google.bigtable.v2.Column;
@@ -30,11 +31,48 @@ import com.google.bigtable.v2.Family;
import com.google.bigtable.v2.Mutation;
import com.google.protobuf.ByteString;
import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
-public class TestUtils {
+class BigtableTestUtils {
+
+ static final String BOOL_COLUMN = "boolColumn";
+ static final String LONG_COLUMN = "longColumn";
+ static final String STRING_COLUMN = "stringColumn";
+ static final String DOUBLE_COLUMN = "doubleColumn";
+ static final String FAMILY_TEST = "familyTest";
+
+ static final Schema LONG_COLUMN_SCHEMA =
+ Schema.builder()
+ .addInt64Field(VALUE)
+ .addInt64Field(TIMESTAMP_MICROS)
+ .addArrayField(LABELS, Schema.FieldType.STRING)
+ .build();
+
+ static final Schema TEST_FAMILY_SCHEMA =
+ Schema.builder()
+ .addBooleanField(BOOL_COLUMN)
+ .addRowField(LONG_COLUMN, LONG_COLUMN_SCHEMA)
+ .addArrayField(STRING_COLUMN, Schema.FieldType.STRING)
+ .addDoubleField(DOUBLE_COLUMN)
+ .build();
+
+ static final Schema TEST_SCHEMA =
+ Schema.builder().addStringField(KEY).addRowField(FAMILY_TEST, TEST_FAMILY_SCHEMA).build();
+
+ static final Schema TEST_FLAT_SCHEMA =
+ Schema.builder()
+ .addStringField(KEY)
+ .addBooleanField(BOOL_COLUMN)
+ .addInt64Field(LONG_COLUMN)
+ .addStringField(STRING_COLUMN)
+ .addDoubleField(DOUBLE_COLUMN)
+ .build();
+
+ static final long NOW = 5_000_000_000L;
+ static final long LATER = NOW + 1_000L;
static com.google.bigtable.v2.Row bigtableRow(long i) {
return com.google.bigtable.v2.Row.newBuilder()
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/CellValueParserTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/CellValueParserTest.java
index fccb20d..4bf776b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/CellValueParserTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/CellValueParserTest.java
@@ -18,7 +18,6 @@
package org.apache.beam.sdk.io.gcp.bigtable;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.beam.sdk.io.gcp.testing.BigtableTestUtils.checkMessage;
import static org.apache.beam.sdk.schemas.Schema.TypeName.BOOLEAN;
import static org.apache.beam.sdk.schemas.Schema.TypeName.BYTE;
import static org.apache.beam.sdk.schemas.Schema.TypeName.BYTES;
@@ -30,11 +29,15 @@ import static org.apache.beam.sdk.schemas.Schema.TypeName.INT32;
import static org.apache.beam.sdk.schemas.Schema.TypeName.INT64;
import static org.apache.beam.sdk.schemas.Schema.TypeName.MAP;
import static org.apache.beam.sdk.schemas.Schema.TypeName.STRING;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.fail;
import com.google.bigtable.v2.Cell;
import com.google.protobuf.ByteString;
+import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.Test;
@@ -274,4 +277,12 @@ public class CellValueParserTest {
private Cell cell(byte[] value) {
return Cell.newBuilder().setValue(ByteString.copyFrom(value)).build();
}
+
+ private void checkMessage(@Nullable String message, String substring) {
+ if (message != null) {
+ assertThat(message, containsString(substring));
+ } else {
+ fail();
+ }
+ }
}