You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2023/02/09 05:00:23 UTC
[beam] branch master updated: Adding SpannerIO.readChangeStreams support for SchemaTransform (#24999)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d36f34df570 Adding SpannerIO.readChangeStreams support for SchemaTransform (#24999)
d36f34df570 is described below
commit d36f34df570b845a5817ba38d5f16b5f12b2a825
Author: Pablo Estrada <pa...@users.noreply.github.com>
AuthorDate: Wed Feb 8 21:00:16 2023 -0800
Adding SpannerIO.readChangeStreams support for SchemaTransform (#24999)
* Adding SpannerIO.readChangeStreams support for SchemaTransform
* Improve test and schema shape to include changestream information
* add schema checks
* fixup
* remove unneeded file
* reducing the number of tables that we cache
---
.../beam/sdk/io/gcp/spanner/ReadSpannerSchema.java | 37 +-
.../beam/sdk/io/gcp/spanner/SpannerSchema.java | 8 +-
...erChangestreamsReadSchemaTransformProvider.java | 394 +++++++++++++++++++++
.../it/SpannerChangeStreamsSchemaTransformIT.java | 262 ++++++++++++++
4 files changed, 695 insertions(+), 6 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java
index 9b30747f778..9c82701e5a4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java
@@ -22,6 +22,8 @@ import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.ReadOnlyTransaction;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Statement;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.PCollectionView;
@@ -32,17 +34,44 @@ import org.apache.beam.sdk.values.PCollectionView;
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
-class ReadSpannerSchema extends DoFn<Void, SpannerSchema> {
+public class ReadSpannerSchema extends DoFn<Void, SpannerSchema> {
private final SpannerConfig config;
private final PCollectionView<Dialect> dialectView;
+ private final Set<String> allowedTableNames;
+
private transient SpannerAccessor spannerAccessor;
+ /**
+ * Constructor for creating an instance of the ReadSpannerSchema class. If no {@param
+ * allowedTableNames} is passed, every single table is allowed.
+ *
+ * @param config The SpannerConfig object that contains the configuration for accessing the
+ * Spanner database.
+ * @param dialectView A PCollectionView object that holds a Dialect object for the database
+ * dialect to use for reading the Spanner schema.
+ */
public ReadSpannerSchema(SpannerConfig config, PCollectionView<Dialect> dialectView) {
+ this(config, dialectView, new HashSet<String>());
+ }
+
+ /**
+ * Constructor for creating an instance of the ReadSpannerSchema class.
+ *
+ * @param config The SpannerConfig object that contains the configuration for accessing the
+ * Spanner database.
+ * @param dialectView A PCollectionView object that holds a Dialect object for the database
+ * dialect to use for reading the Spanner schema.
+ * @param allowedTableNames A set of allowed table names to be used when reading the Spanner
+ * schema.
+ */
+ public ReadSpannerSchema(
+ SpannerConfig config, PCollectionView<Dialect> dialectView, Set<String> allowedTableNames) {
this.config = config;
this.dialectView = dialectView;
+ this.allowedTableNames = allowedTableNames == null ? new HashSet<>() : allowedTableNames;
}
@Setup
@@ -68,7 +97,11 @@ class ReadSpannerSchema extends DoFn<Void, SpannerSchema> {
String columnName = resultSet.getString(1);
String type = resultSet.getString(2);
long cellsMutated = resultSet.getLong(3);
-
+ if (allowedTableNames.size() > 0 && !allowedTableNames.contains(tableName)) {
+ // If we want to filter out table names, and the current table name is not part
+ // of the allowed names, we exclude it.
+ continue;
+ }
builder.addColumn(tableName, columnName, type, cellsMutated);
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
index 78941d12a7f..09394d37a44 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
@@ -34,7 +34,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
-abstract class SpannerSchema implements Serializable {
+public abstract class SpannerSchema implements Serializable {
abstract ImmutableList<String> tables();
abstract Dialect dialect();
@@ -135,18 +135,18 @@ abstract class SpannerSchema implements Serializable {
}
@AutoValue
- abstract static class KeyPart implements Serializable {
+ public abstract static class KeyPart implements Serializable {
static KeyPart create(String field, boolean desc) {
return new AutoValue_SpannerSchema_KeyPart(field, desc);
}
- abstract String getField();
+ public abstract String getField();
abstract boolean isDesc();
}
@AutoValue
- abstract static class Column implements Serializable {
+ public abstract static class Column implements Serializable {
static Column create(String name, Type type) {
return new AutoValue_SpannerSchema_Column(name, type);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java
new file mode 100644
index 00000000000..150895c19e3
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Dialect;
+import com.google.cloud.spanner.Type;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.gcp.spanner.ReadSpannerSchema;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerSchema;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.gson.Gson;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+
+public class SpannerChangestreamsReadSchemaTransformProvider
+ extends TypedSchemaTransformProvider<
+ SpannerChangestreamsReadSchemaTransformProvider.SpannerChangestreamsReadConfiguration> {
+ @Override
+ protected @UnknownKeyFor @NonNull @Initialized Class<SpannerChangestreamsReadConfiguration>
+ configurationClass() {
+ return SpannerChangestreamsReadConfiguration.class;
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+ SpannerChangestreamsReadSchemaTransformProvider.SpannerChangestreamsReadConfiguration
+ configuration) {
+ return new SchemaTransform() {
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized PTransform<
+ @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple,
+ @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple>
+ buildTransform() {
+ return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() {
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ Pipeline p = input.getPipeline();
+ // TODO(pabloem): Does this action create/destroy a new metadata table??
+ Schema tableChangesSchema = getTableSchema(configuration);
+ SpannerIO.ReadChangeStream readChangeStream =
+ SpannerIO.readChangeStream()
+ .withSpannerConfig(
+ SpannerConfig.create()
+ .withProjectId(configuration.getProjectId())
+ .withInstanceId(configuration.getInstanceId())
+ .withDatabaseId(configuration.getDatabaseId()))
+ .withChangeStreamName(configuration.getChangeStreamName())
+ .withInclusiveStartAt(
+ Timestamp.parseTimestamp(configuration.getStartAtTimestamp()))
+ .withDatabaseId(configuration.getDatabaseId())
+ .withProjectId(configuration.getProjectId())
+ .withInstanceId(configuration.getInstanceId());
+
+ if (configuration.getEndAtTimestamp() != null) {
+ String endTs =
+ Objects.requireNonNull(Objects.requireNonNull(configuration.getEndAtTimestamp()));
+ readChangeStream =
+ readChangeStream.withInclusiveEndAt(Timestamp.parseTimestamp(endTs));
+ }
+ return PCollectionRowTuple.of(
+ "output",
+ p.apply(readChangeStream)
+ .apply(
+ ParDo.of(
+ new DataChangeRecordToRow(
+ configuration.getTable(), tableChangesSchema)))
+ .setRowSchema(tableChangesSchema));
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized String identifier() {
+ return "beam:schematransform:org.apache.beam:spanner_cdc_read:v1";
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
+ inputCollectionNames() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
+ outputCollectionNames() {
+ return Collections.singletonList("output");
+ }
+
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class SpannerChangestreamsReadConfiguration implements Serializable {
+ public abstract String getDatabaseId();
+
+ public abstract String getProjectId();
+
+ public abstract String getInstanceId();
+
+ public abstract String getTable();
+
+ public abstract String getStartAtTimestamp();
+
+ public abstract @Nullable String getEndAtTimestamp();
+
+ public abstract String getChangeStreamName();
+
+ public static Builder builder() {
+ return new AutoValue_SpannerChangestreamsReadSchemaTransformProvider_SpannerChangestreamsReadConfiguration
+ .Builder();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setDatabaseId(String databaseId);
+
+ public abstract Builder setProjectId(String projectId);
+
+ public abstract Builder setInstanceId(String instanceId);
+
+ public abstract Builder setTable(String table);
+
+ public abstract Builder setStartAtTimestamp(String isoTimestamp);
+
+ public abstract Builder setEndAtTimestamp(String isoTimestamp);
+
+ public abstract Builder setChangeStreamName(String changeStreamName);
+
+ public abstract SpannerChangestreamsReadConfiguration build();
+ }
+ }
+
+ private static final class DataChangeRecordToRow extends DoFn<DataChangeRecord, Row> {
+ private final Schema tableChangeRecordSchema;
+ private final String tableName;
+ private transient Gson gson;
+
+ DataChangeRecordToRow(String tableName, Schema tableChangeRecordSchema) {
+ this.tableName = tableName;
+ this.tableChangeRecordSchema = tableChangeRecordSchema;
+ this.gson = new Gson();
+ }
+
+ public Gson getGson() {
+ if (gson == null) {
+ gson = new Gson();
+ }
+ return gson;
+ }
+
+ @ProcessElement
+ public void process(@DoFn.Element DataChangeRecord record, OutputReceiver<Row> receiver) {
+ if (!record.getTableName().equalsIgnoreCase(tableName)) {
+ // If the element does not belong to the appropriate table name, we discard it.
+ return;
+ }
+ final Instant timestamp = new Instant(record.getRecordTimestamp().toSqlTimestamp());
+
+ for (Mod mod : record.getMods()) {
+ Schema internalRowSchema =
+ tableChangeRecordSchema.getField("rowValues").getType().getRowSchema();
+ if (internalRowSchema == null) {
+ throw new RuntimeException("Row schema for internal row is null and cannot be utilized.");
+ }
+ Row.FieldValueBuilder rowBuilder = Row.fromRow(Row.nullRow(internalRowSchema));
+ final Map<String, String> newValues =
+ Optional.ofNullable(mod.getNewValuesJson())
+ .map(nonNullValues -> getGson().fromJson(nonNullValues, Map.class))
+ .orElseGet(Collections::emptyMap);
+ final Map<String, String> keyValues =
+ Optional.ofNullable(mod.getKeysJson())
+ .map(nonNullValues -> getGson().fromJson(nonNullValues, Map.class))
+ .orElseGet(Collections::emptyMap);
+
+ for (Map.Entry<String, String> valueEntry : newValues.entrySet()) {
+ if (valueEntry.getValue() == null) {
+ continue;
+ }
+ // TODO(pabloem): Understand why SpannerSchema has field names in lowercase...
+ rowBuilder =
+ rowBuilder.withFieldValue(
+ valueEntry.getKey().toLowerCase(),
+ stringToParsedValue(
+ internalRowSchema.getField(valueEntry.getKey().toLowerCase()).getType(),
+ valueEntry.getValue()));
+ }
+
+ for (Map.Entry<String, String> pkEntry : keyValues.entrySet()) {
+ if (pkEntry.getValue() == null) {
+ continue;
+ }
+ // TODO(pabloem): Understand why SpannerSchema has field names in lowercase...
+ rowBuilder =
+ rowBuilder.withFieldValue(
+ pkEntry.getKey().toLowerCase(),
+ stringToParsedValue(
+ internalRowSchema.getField(pkEntry.getKey().toLowerCase()).getType(),
+ pkEntry.getValue()));
+ }
+ receiver.outputWithTimestamp(
+ Row.withSchema(tableChangeRecordSchema)
+ .addValue(record.getModType().toString())
+ .addValue(record.getCommitTimestamp().toString())
+ .addValue(Long.parseLong(record.getRecordSequence()))
+ .addValue(rowBuilder.build())
+ .build(),
+ timestamp);
+ }
+ }
+ }
+
+ private static final HashMap<String, SpannerSchema> TABLE_SCHEMAS = new HashMap<>();
+
+ private static Schema getTableSchema(SpannerChangestreamsReadConfiguration config) {
+ Pipeline miniPipeline = Pipeline.create();
+ PCollectionView<Dialect> sqlDialectView =
+ miniPipeline
+ .apply("Create Dialect", Create.of(Dialect.GOOGLE_STANDARD_SQL))
+ .apply("Dialect to View", View.asSingleton());
+ miniPipeline
+ .apply(Create.of((Void) null))
+ .apply(
+ ParDo.of(
+ new ReadSpannerSchema(
+ SpannerConfig.create()
+ .withDatabaseId(config.getDatabaseId())
+ .withInstanceId(config.getInstanceId())
+ .withProjectId(config.getProjectId()),
+ sqlDialectView,
+ Sets.newHashSet(config.getTable())))
+ .withSideInput("dialect", sqlDialectView))
+ .apply(
+ ParDo.of(
+ new DoFn<SpannerSchema, String>() {
+ @ProcessElement
+ public void process(@DoFn.Element SpannerSchema schema) {
+ TABLE_SCHEMAS.put(config.getTable(), schema);
+ }
+ }))
+ .setCoder(StringUtf8Coder.of());
+ miniPipeline.run().waitUntilFinish();
+ // Clean up the static map from the object.
+ SpannerSchema finalSchemaObj = TABLE_SCHEMAS.remove(config.getTable());
+ if (finalSchemaObj == null) {
+ throw new RuntimeException(
+ String.format("Could not get schema for configuration %s", config));
+ }
+ return spannerSchemaToBeamSchema(finalSchemaObj, config.getTable());
+ }
+
+ private static Schema spannerSchemaToBeamSchema(
+ SpannerSchema spannerSchema, final String tableName) {
+ OptionalInt optionalIdx =
+ IntStream.range(0, spannerSchema.getTables().size())
+ .filter(idx -> spannerSchema.getTables().get(idx).equalsIgnoreCase(tableName))
+ .findAny();
+ if (!optionalIdx.isPresent()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unable to retrieve schema for table %s. Found only tables: [%s]",
+ tableName, String.join(", ", spannerSchema.getTables())));
+ }
+ Schema.Builder schemaBuilder = Schema.builder();
+
+ String spannerTableName = spannerSchema.getTables().get(optionalIdx.getAsInt());
+
+ for (SpannerSchema.Column col : spannerSchema.getColumns(spannerTableName)) {
+ schemaBuilder =
+ schemaBuilder.addNullableField(col.getName(), spannerTypeToBeamType(col.getType()));
+ }
+
+ schemaBuilder =
+ schemaBuilder.setOptions(
+ Schema.Options.builder()
+ .setOption(
+ "primaryKeyColumns",
+ Schema.FieldType.array(Schema.FieldType.STRING),
+ spannerSchema.getKeyParts(spannerTableName).stream()
+ .map(SpannerSchema.KeyPart::getField)
+ .collect(Collectors.toList())));
+
+ return Schema.builder()
+ .addStringField("operation")
+ .addStringField("commitTimestamp")
+ .addInt64Field("recordSequence")
+ .addRowField("rowValues", schemaBuilder.build())
+ .build();
+ }
+
+ private static Object stringToParsedValue(Schema.FieldType fieldType, String fieldValue) {
+ switch (fieldType.getTypeName()) {
+ case STRING:
+ return fieldValue;
+ case INT64:
+ return Long.valueOf(fieldValue);
+ case INT16:
+ case INT32:
+ return Integer.valueOf(fieldValue);
+ case FLOAT:
+ return Float.parseFloat(fieldValue);
+ case DOUBLE:
+ return Double.parseDouble(fieldValue);
+ case BOOLEAN:
+ return Boolean.parseBoolean(fieldValue);
+ case BYTES:
+ return fieldValue.getBytes(StandardCharsets.UTF_8);
+ case DATETIME:
+ return new DateTime(fieldValue);
+ case DECIMAL:
+ return new BigDecimal(fieldValue);
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unable to parse field with type %s", fieldType));
+ }
+ }
+
+ private static Schema.FieldType spannerTypeToBeamType(Type spannerType) {
+ switch (spannerType.getCode()) {
+ case BOOL:
+ return Schema.FieldType.BOOLEAN;
+ case BYTES:
+ return Schema.FieldType.BYTES;
+ case STRING:
+ return Schema.FieldType.STRING;
+ case INT64:
+ return Schema.FieldType.INT64;
+ case NUMERIC:
+ return Schema.FieldType.DECIMAL;
+ case FLOAT64:
+ return Schema.FieldType.DOUBLE;
+ case TIMESTAMP:
+ case DATE:
+ return Schema.FieldType.DATETIME;
+ case ARRAY:
+ return Schema.FieldType.array(spannerTypeToBeamType(spannerType.getArrayElementType()));
+ case JSON:
+ case STRUCT:
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupported spanner type: %s", spannerType));
+ }
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamsSchemaTransformIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamsSchemaTransformIT.java
new file mode 100644
index 00000000000..cae1b782eed
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamsSchemaTransformIT.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.it;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.ErrorCode;
+import com.google.cloud.spanner.Key;
+import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.Options;
+import com.google.cloud.spanner.ResultSet;
+import com.google.cloud.spanner.SpannerException;
+import com.google.cloud.spanner.Statement;
+import java.util.Collections;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.SpannerChangestreamsReadSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.Select;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** End-to-end test of Cloud Spanner Source. */
+@RunWith(JUnit4.class)
+public class SpannerChangeStreamsSchemaTransformIT {
+
+ @ClassRule public static final IntegrationTestEnv ENV = new IntegrationTestEnv();
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+ private static String instanceId;
+ private static String projectId;
+ private static String databaseId;
+ private static String metadataTableName;
+ private static String changeStreamTableName;
+ private static String changeStreamName;
+ private static DatabaseClient databaseClient;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ projectId = ENV.getProjectId();
+ instanceId = ENV.getInstanceId();
+ databaseId = ENV.getDatabaseId();
+ metadataTableName = ENV.getMetadataTableName();
+ changeStreamTableName = ENV.createSingersTable();
+ changeStreamName = ENV.createChangeStreamFor(changeStreamTableName);
+ System.out.println(changeStreamName);
+ databaseClient = ENV.getDatabaseClient();
+ }
+
+ @Before
+ public void before() {
+ pipeline.getOptions().as(ChangeStreamTestPipelineOptions.class).setStreaming(true);
+ pipeline.getOptions().as(ChangeStreamTestPipelineOptions.class).setBlockOnRun(false);
+ }
+
+ @Test
+ public void testReadSpannerChangeStream() {
+ // Defines how many rows are going to be inserted / updated / deleted in the test
+ final int numRows = 5;
+ // Inserts numRows rows and uses the first commit timestamp as the startAt for reading the
+ // change stream
+ final Pair<Timestamp, Timestamp> insertTimestamps = insertRows(numRows);
+ final Timestamp startAt = insertTimestamps.getLeft();
+ // Updates the created rows
+ updateRows(numRows);
+ // Delete the created rows and uses the last commit timestamp as the endAt for reading the
+ // change stream
+ final Pair<Timestamp, Timestamp> deleteTimestamps = deleteRows(numRows);
+ final Timestamp endAt = deleteTimestamps.getRight();
+
+ final PCollection<Row> tokens =
+ PCollectionRowTuple.empty(pipeline)
+ .apply(
+ new SpannerChangestreamsReadSchemaTransformProvider()
+ .from(
+ SpannerChangestreamsReadSchemaTransformProvider
+ .SpannerChangestreamsReadConfiguration.builder()
+ .setDatabaseId(databaseId)
+ .setInstanceId(instanceId)
+ .setProjectId(projectId)
+ .setTable(changeStreamTableName)
+ .setChangeStreamName(changeStreamName)
+ .setStartAtTimestamp(startAt.toString())
+ .setEndAtTimestamp(endAt.toString())
+ .build())
+ .buildTransform())
+ .get("output")
+ .apply(
+ Window.<Row>into(new GlobalWindows())
+ .triggering(AfterWatermark.pastEndOfWindow())
+ .discardingFiredPanes());
+
+ assertEquals(
+ Schema.builder()
+ .addStringField("operation")
+ .addStringField("commitTimestamp")
+ .addInt64Field("recordSequence")
+ .addRowField(
+ "rowValues",
+ Schema.builder()
+ .addNullableField("singerid", Schema.FieldType.INT64)
+ .addNullableField("firstname", Schema.FieldType.STRING)
+ .addNullableField("lastname", Schema.FieldType.STRING)
+ .addNullableField("singerinfo", Schema.FieldType.BYTES)
+ .setOptions(
+ Schema.Options.builder()
+ .addOptions(
+ Schema.Options.builder()
+ .setOption(
+ "primaryKeyColumns",
+ Schema.FieldType.array(Schema.FieldType.STRING),
+ Collections.singletonList("singerid"))
+ .build())
+ .build())
+ .build())
+ .build(),
+ tokens.getSchema());
+
+ PAssert.that(tokens.apply(Select.fieldNames("operation")))
+ .containsInAnyOrder(
+ operationRow("INSERT"),
+ operationRow("INSERT"),
+ operationRow("INSERT"),
+ operationRow("INSERT"),
+ operationRow("INSERT"),
+ operationRow("UPDATE"),
+ operationRow("UPDATE"),
+ operationRow("UPDATE"),
+ operationRow("UPDATE"),
+ operationRow("UPDATE"),
+ operationRow("DELETE"),
+ operationRow("DELETE"),
+ operationRow("DELETE"),
+ operationRow("DELETE"),
+ operationRow("DELETE"));
+ pipeline.run().waitUntilFinish();
+
+ assertMetadataTableHasBeenDropped();
+ }
+
+ private Row operationRow(String operation) {
+ return Row.withSchema(Schema.builder().addField("operation", Schema.FieldType.STRING).build())
+ .addValue(operation)
+ .build();
+ }
+
+ private static void assertMetadataTableHasBeenDropped() {
+ try (ResultSet resultSet =
+ databaseClient
+ .singleUse()
+ .executeQuery(Statement.of("SELECT * FROM " + metadataTableName))) {
+ resultSet.next();
+ fail(
+ "The metadata table "
+ + metadataTableName
+ + " should had been dropped, but it still exists");
+ } catch (SpannerException e) {
+ assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode());
+ assertTrue(
+ "Error message must contain \"Table not found\"",
+ e.getMessage().contains("Table not found"));
+ }
+ }
+
+ private static Pair<Timestamp, Timestamp> insertRows(int n) {
+ final Timestamp firstCommitTimestamp = insertRow(1);
+ for (int i = 2; i < n; i++) {
+ insertRow(i);
+ }
+ final Timestamp lastCommitTimestamp = insertRow(n);
+ return Pair.of(firstCommitTimestamp, lastCommitTimestamp);
+ }
+
+ private static Pair<Timestamp, Timestamp> updateRows(int n) {
+ final Timestamp firstCommitTimestamp = updateRow(1);
+ for (int i = 2; i < n; i++) {
+ updateRow(i);
+ }
+ final Timestamp lastCommitTimestamp = updateRow(n);
+ return Pair.of(firstCommitTimestamp, lastCommitTimestamp);
+ }
+
+ private static Pair<Timestamp, Timestamp> deleteRows(int n) {
+ final Timestamp firstCommitTimestamp = deleteRow(1);
+ for (int i = 2; i < n; i++) {
+ deleteRow(i);
+ }
+ final Timestamp lastCommitTimestamp = deleteRow(n);
+ return Pair.of(firstCommitTimestamp, lastCommitTimestamp);
+ }
+
+ private static Timestamp insertRow(int singerId) {
+ return databaseClient
+ .writeWithOptions(
+ Collections.singletonList(
+ Mutation.newInsertBuilder(changeStreamTableName)
+ .set("SingerId")
+ .to(singerId)
+ .set("FirstName")
+ .to("First Name " + singerId)
+ .set("LastName")
+ .to("Last Name " + singerId)
+ .build()),
+ Options.tag("app=beam;action=insert"))
+ .getCommitTimestamp();
+ }
+
+ private static Timestamp updateRow(int singerId) {
+ return databaseClient
+ .writeWithOptions(
+ Collections.singletonList(
+ Mutation.newUpdateBuilder(changeStreamTableName)
+ .set("SingerId")
+ .to(singerId)
+ .set("FirstName")
+ .to("Updated First Name " + singerId)
+ .set("LastName")
+ .to("Updated Last Name " + singerId)
+ .build()),
+ Options.tag("app=beam;action=update"))
+ .getCommitTimestamp();
+ }
+
+ private static Timestamp deleteRow(int singerId) {
+ return databaseClient
+ .writeWithOptions(
+ Collections.singletonList(Mutation.delete(changeStreamTableName, Key.of(singerId))),
+ Options.tag("app=beam;action=delete"))
+ .getCommitTimestamp();
+ }
+}