You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2023/02/09 05:00:23 UTC

[beam] branch master updated: Adding SpannerIO.readChangeStreams support for SchemaTransform (#24999)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d36f34df570 Adding SpannerIO.readChangeStreams support for SchemaTransform (#24999)
d36f34df570 is described below

commit d36f34df570b845a5817ba38d5f16b5f12b2a825
Author: Pablo Estrada <pa...@users.noreply.github.com>
AuthorDate: Wed Feb 8 21:00:16 2023 -0800

    Adding SpannerIO.readChangeStreams support for SchemaTransform (#24999)
    
    * Adding SpannerIO.readChangeStreams support for SchemaTransform
    
    * Improve test and schema shape to include changestream information
    
    * add schema checks
    
    * fixup
    
    * remove unneeded file
    
    * reducing the number of tables that we cache
---
 .../beam/sdk/io/gcp/spanner/ReadSpannerSchema.java |  37 +-
 .../beam/sdk/io/gcp/spanner/SpannerSchema.java     |   8 +-
 ...erChangestreamsReadSchemaTransformProvider.java | 394 +++++++++++++++++++++
 .../it/SpannerChangeStreamsSchemaTransformIT.java  | 262 ++++++++++++++
 4 files changed, 695 insertions(+), 6 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java
index 9b30747f778..9c82701e5a4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java
@@ -22,6 +22,8 @@ import com.google.cloud.spanner.Dialect;
 import com.google.cloud.spanner.ReadOnlyTransaction;
 import com.google.cloud.spanner.ResultSet;
 import com.google.cloud.spanner.Statement;
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.PCollectionView;
 
@@ -32,17 +34,44 @@ import org.apache.beam.sdk.values.PCollectionView;
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
-class ReadSpannerSchema extends DoFn<Void, SpannerSchema> {
+public class ReadSpannerSchema extends DoFn<Void, SpannerSchema> {
 
   private final SpannerConfig config;
 
   private final PCollectionView<Dialect> dialectView;
 
+  private final Set<String> allowedTableNames;
+
   private transient SpannerAccessor spannerAccessor;
 
+  /**
+   * Constructor for creating an instance of the ReadSpannerSchema class. If no {@param
+   * allowedTableNames} is passed, every single table is allowed.
+   *
+   * @param config The SpannerConfig object that contains the configuration for accessing the
+   *     Spanner database.
+   * @param dialectView A PCollectionView object that holds a Dialect object for the database
+   *     dialect to use for reading the Spanner schema.
+   */
   public ReadSpannerSchema(SpannerConfig config, PCollectionView<Dialect> dialectView) {
+    this(config, dialectView, new HashSet<String>());
+  }
+
+  /**
+   * Constructor for creating an instance of the ReadSpannerSchema class.
+   *
+   * @param config The SpannerConfig object that contains the configuration for accessing the
+   *     Spanner database.
+   * @param dialectView A PCollectionView object that holds a Dialect object for the database
+   *     dialect to use for reading the Spanner schema.
+   * @param allowedTableNames A set of allowed table names to be used when reading the Spanner
+   *     schema.
+   */
+  public ReadSpannerSchema(
+      SpannerConfig config, PCollectionView<Dialect> dialectView, Set<String> allowedTableNames) {
     this.config = config;
     this.dialectView = dialectView;
+    this.allowedTableNames = allowedTableNames == null ? new HashSet<>() : allowedTableNames;
   }
 
   @Setup
@@ -68,7 +97,11 @@ class ReadSpannerSchema extends DoFn<Void, SpannerSchema> {
         String columnName = resultSet.getString(1);
         String type = resultSet.getString(2);
         long cellsMutated = resultSet.getLong(3);
-
+        if (allowedTableNames.size() > 0 && !allowedTableNames.contains(tableName)) {
+          // If we want to filter out table names, and the current table name is not part
+          // of the allowed names, we exclude it.
+          continue;
+        }
         builder.addColumn(tableName, columnName, type, cellsMutated);
       }
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
index 78941d12a7f..09394d37a44 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
@@ -34,7 +34,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
-abstract class SpannerSchema implements Serializable {
+public abstract class SpannerSchema implements Serializable {
   abstract ImmutableList<String> tables();
 
   abstract Dialect dialect();
@@ -135,18 +135,18 @@ abstract class SpannerSchema implements Serializable {
   }
 
   @AutoValue
-  abstract static class KeyPart implements Serializable {
+  public abstract static class KeyPart implements Serializable {
     static KeyPart create(String field, boolean desc) {
       return new AutoValue_SpannerSchema_KeyPart(field, desc);
     }
 
-    abstract String getField();
+    public abstract String getField();
 
     abstract boolean isDesc();
   }
 
   @AutoValue
-  abstract static class Column implements Serializable {
+  public abstract static class Column implements Serializable {
 
     static Column create(String name, Type type) {
       return new AutoValue_SpannerSchema_Column(name, type);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java
new file mode 100644
index 00000000000..150895c19e3
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Dialect;
+import com.google.cloud.spanner.Type;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.gcp.spanner.ReadSpannerSchema;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerSchema;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.gson.Gson;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+
+public class SpannerChangestreamsReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<
+        SpannerChangestreamsReadSchemaTransformProvider.SpannerChangestreamsReadConfiguration> {
+  @Override
+  protected @UnknownKeyFor @NonNull @Initialized Class<SpannerChangestreamsReadConfiguration>
+      configurationClass() {
+    return SpannerChangestreamsReadConfiguration.class;
+  }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+      SpannerChangestreamsReadSchemaTransformProvider.SpannerChangestreamsReadConfiguration
+          configuration) {
+    return new SchemaTransform() {
+      @Override
+      public @UnknownKeyFor @NonNull @Initialized PTransform<
+              @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple,
+              @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple>
+          buildTransform() {
+        return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() {
+          @Override
+          public PCollectionRowTuple expand(PCollectionRowTuple input) {
+            Pipeline p = input.getPipeline();
+            // TODO(pabloem): Does this action create/destroy a new metadata table??
+            Schema tableChangesSchema = getTableSchema(configuration);
+            SpannerIO.ReadChangeStream readChangeStream =
+                SpannerIO.readChangeStream()
+                    .withSpannerConfig(
+                        SpannerConfig.create()
+                            .withProjectId(configuration.getProjectId())
+                            .withInstanceId(configuration.getInstanceId())
+                            .withDatabaseId(configuration.getDatabaseId()))
+                    .withChangeStreamName(configuration.getChangeStreamName())
+                    .withInclusiveStartAt(
+                        Timestamp.parseTimestamp(configuration.getStartAtTimestamp()))
+                    .withDatabaseId(configuration.getDatabaseId())
+                    .withProjectId(configuration.getProjectId())
+                    .withInstanceId(configuration.getInstanceId());
+
+            if (configuration.getEndAtTimestamp() != null) {
+              String endTs =
+                  Objects.requireNonNull(Objects.requireNonNull(configuration.getEndAtTimestamp()));
+              readChangeStream =
+                  readChangeStream.withInclusiveEndAt(Timestamp.parseTimestamp(endTs));
+            }
+            return PCollectionRowTuple.of(
+                "output",
+                p.apply(readChangeStream)
+                    .apply(
+                        ParDo.of(
+                            new DataChangeRecordToRow(
+                                configuration.getTable(), tableChangesSchema)))
+                    .setRowSchema(tableChangesSchema));
+          }
+        };
+      }
+    };
+  }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized String identifier() {
+    return "beam:schematransform:org.apache.beam:spanner_cdc_read:v1";
+  }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
+      inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
+      outputCollectionNames() {
+    return Collections.singletonList("output");
+  }
+
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class SpannerChangestreamsReadConfiguration implements Serializable {
+    public abstract String getDatabaseId();
+
+    public abstract String getProjectId();
+
+    public abstract String getInstanceId();
+
+    public abstract String getTable();
+
+    public abstract String getStartAtTimestamp();
+
+    public abstract @Nullable String getEndAtTimestamp();
+
+    public abstract String getChangeStreamName();
+
+    public static Builder builder() {
+      return new AutoValue_SpannerChangestreamsReadSchemaTransformProvider_SpannerChangestreamsReadConfiguration
+          .Builder();
+    }
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setDatabaseId(String databaseId);
+
+      public abstract Builder setProjectId(String projectId);
+
+      public abstract Builder setInstanceId(String instanceId);
+
+      public abstract Builder setTable(String table);
+
+      public abstract Builder setStartAtTimestamp(String isoTimestamp);
+
+      public abstract Builder setEndAtTimestamp(String isoTimestamp);
+
+      public abstract Builder setChangeStreamName(String changeStreamName);
+
+      public abstract SpannerChangestreamsReadConfiguration build();
+    }
+  }
+
+  private static final class DataChangeRecordToRow extends DoFn<DataChangeRecord, Row> {
+    private final Schema tableChangeRecordSchema;
+    private final String tableName;
+    private transient Gson gson;
+
+    DataChangeRecordToRow(String tableName, Schema tableChangeRecordSchema) {
+      this.tableName = tableName;
+      this.tableChangeRecordSchema = tableChangeRecordSchema;
+      this.gson = new Gson();
+    }
+
+    public Gson getGson() {
+      if (gson == null) {
+        gson = new Gson();
+      }
+      return gson;
+    }
+
+    @ProcessElement
+    public void process(@DoFn.Element DataChangeRecord record, OutputReceiver<Row> receiver) {
+      if (!record.getTableName().equalsIgnoreCase(tableName)) {
+        // If the element does not belong to the appropriate table name, we discard it.
+        return;
+      }
+      final Instant timestamp = new Instant(record.getRecordTimestamp().toSqlTimestamp());
+
+      for (Mod mod : record.getMods()) {
+        Schema internalRowSchema =
+            tableChangeRecordSchema.getField("rowValues").getType().getRowSchema();
+        if (internalRowSchema == null) {
+          throw new RuntimeException("Row schema for internal row is null and cannot be utilized.");
+        }
+        Row.FieldValueBuilder rowBuilder = Row.fromRow(Row.nullRow(internalRowSchema));
+        final Map<String, String> newValues =
+            Optional.ofNullable(mod.getNewValuesJson())
+                .map(nonNullValues -> getGson().fromJson(nonNullValues, Map.class))
+                .orElseGet(Collections::emptyMap);
+        final Map<String, String> keyValues =
+            Optional.ofNullable(mod.getKeysJson())
+                .map(nonNullValues -> getGson().fromJson(nonNullValues, Map.class))
+                .orElseGet(Collections::emptyMap);
+
+        for (Map.Entry<String, String> valueEntry : newValues.entrySet()) {
+          if (valueEntry.getValue() == null) {
+            continue;
+          }
+          // TODO(pabloem): Understand why SpannerSchema has field names in lowercase...
+          rowBuilder =
+              rowBuilder.withFieldValue(
+                  valueEntry.getKey().toLowerCase(),
+                  stringToParsedValue(
+                      internalRowSchema.getField(valueEntry.getKey().toLowerCase()).getType(),
+                      valueEntry.getValue()));
+        }
+
+        for (Map.Entry<String, String> pkEntry : keyValues.entrySet()) {
+          if (pkEntry.getValue() == null) {
+            continue;
+          }
+          // TODO(pabloem): Understand why SpannerSchema has field names in lowercase...
+          rowBuilder =
+              rowBuilder.withFieldValue(
+                  pkEntry.getKey().toLowerCase(),
+                  stringToParsedValue(
+                      internalRowSchema.getField(pkEntry.getKey().toLowerCase()).getType(),
+                      pkEntry.getValue()));
+        }
+        receiver.outputWithTimestamp(
+            Row.withSchema(tableChangeRecordSchema)
+                .addValue(record.getModType().toString())
+                .addValue(record.getCommitTimestamp().toString())
+                .addValue(Long.parseLong(record.getRecordSequence()))
+                .addValue(rowBuilder.build())
+                .build(),
+            timestamp);
+      }
+    }
+  }
+
+  private static final HashMap<String, SpannerSchema> TABLE_SCHEMAS = new HashMap<>();
+
+  private static Schema getTableSchema(SpannerChangestreamsReadConfiguration config) {
+    Pipeline miniPipeline = Pipeline.create();
+    PCollectionView<Dialect> sqlDialectView =
+        miniPipeline
+            .apply("Create Dialect", Create.of(Dialect.GOOGLE_STANDARD_SQL))
+            .apply("Dialect to View", View.asSingleton());
+    miniPipeline
+        .apply(Create.of((Void) null))
+        .apply(
+            ParDo.of(
+                    new ReadSpannerSchema(
+                        SpannerConfig.create()
+                            .withDatabaseId(config.getDatabaseId())
+                            .withInstanceId(config.getInstanceId())
+                            .withProjectId(config.getProjectId()),
+                        sqlDialectView,
+                        Sets.newHashSet(config.getTable())))
+                .withSideInput("dialect", sqlDialectView))
+        .apply(
+            ParDo.of(
+                new DoFn<SpannerSchema, String>() {
+                  @ProcessElement
+                  public void process(@DoFn.Element SpannerSchema schema) {
+                    TABLE_SCHEMAS.put(config.getTable(), schema);
+                  }
+                }))
+        .setCoder(StringUtf8Coder.of());
+    miniPipeline.run().waitUntilFinish();
+    // Clean up the static map from the object.
+    SpannerSchema finalSchemaObj = TABLE_SCHEMAS.remove(config.getTable());
+    if (finalSchemaObj == null) {
+      throw new RuntimeException(
+          String.format("Could not get schema for configuration %s", config));
+    }
+    return spannerSchemaToBeamSchema(finalSchemaObj, config.getTable());
+  }
+
+  private static Schema spannerSchemaToBeamSchema(
+      SpannerSchema spannerSchema, final String tableName) {
+    OptionalInt optionalIdx =
+        IntStream.range(0, spannerSchema.getTables().size())
+            .filter(idx -> spannerSchema.getTables().get(idx).equalsIgnoreCase(tableName))
+            .findAny();
+    if (!optionalIdx.isPresent()) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Unable to retrieve schema for table %s. Found only tables: [%s]",
+              tableName, String.join(", ", spannerSchema.getTables())));
+    }
+    Schema.Builder schemaBuilder = Schema.builder();
+
+    String spannerTableName = spannerSchema.getTables().get(optionalIdx.getAsInt());
+
+    for (SpannerSchema.Column col : spannerSchema.getColumns(spannerTableName)) {
+      schemaBuilder =
+          schemaBuilder.addNullableField(col.getName(), spannerTypeToBeamType(col.getType()));
+    }
+
+    schemaBuilder =
+        schemaBuilder.setOptions(
+            Schema.Options.builder()
+                .setOption(
+                    "primaryKeyColumns",
+                    Schema.FieldType.array(Schema.FieldType.STRING),
+                    spannerSchema.getKeyParts(spannerTableName).stream()
+                        .map(SpannerSchema.KeyPart::getField)
+                        .collect(Collectors.toList())));
+
+    return Schema.builder()
+        .addStringField("operation")
+        .addStringField("commitTimestamp")
+        .addInt64Field("recordSequence")
+        .addRowField("rowValues", schemaBuilder.build())
+        .build();
+  }
+
+  private static Object stringToParsedValue(Schema.FieldType fieldType, String fieldValue) {
+    switch (fieldType.getTypeName()) {
+      case STRING:
+        return fieldValue;
+      case INT64:
+        return Long.valueOf(fieldValue);
+      case INT16:
+      case INT32:
+        return Integer.valueOf(fieldValue);
+      case FLOAT:
+        return Float.parseFloat(fieldValue);
+      case DOUBLE:
+        return Double.parseDouble(fieldValue);
+      case BOOLEAN:
+        return Boolean.parseBoolean(fieldValue);
+      case BYTES:
+        return fieldValue.getBytes(StandardCharsets.UTF_8);
+      case DATETIME:
+        return new DateTime(fieldValue);
+      case DECIMAL:
+        return new BigDecimal(fieldValue);
+      default:
+        throw new IllegalArgumentException(
+            String.format("Unable to parse field with type %s", fieldType));
+    }
+  }
+
+  private static Schema.FieldType spannerTypeToBeamType(Type spannerType) {
+    switch (spannerType.getCode()) {
+      case BOOL:
+        return Schema.FieldType.BOOLEAN;
+      case BYTES:
+        return Schema.FieldType.BYTES;
+      case STRING:
+        return Schema.FieldType.STRING;
+      case INT64:
+        return Schema.FieldType.INT64;
+      case NUMERIC:
+        return Schema.FieldType.DECIMAL;
+      case FLOAT64:
+        return Schema.FieldType.DOUBLE;
+      case TIMESTAMP:
+      case DATE:
+        return Schema.FieldType.DATETIME;
+      case ARRAY:
+        return Schema.FieldType.array(spannerTypeToBeamType(spannerType.getArrayElementType()));
+      case JSON:
+      case STRUCT:
+      default:
+        throw new IllegalArgumentException(
+            String.format("Unsupported spanner type: %s", spannerType));
+    }
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamsSchemaTransformIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamsSchemaTransformIT.java
new file mode 100644
index 00000000000..cae1b782eed
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamsSchemaTransformIT.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.it;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.ErrorCode;
+import com.google.cloud.spanner.Key;
+import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.Options;
+import com.google.cloud.spanner.ResultSet;
+import com.google.cloud.spanner.SpannerException;
+import com.google.cloud.spanner.Statement;
+import java.util.Collections;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.SpannerChangestreamsReadSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.Select;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** End-to-end test of Cloud Spanner Source. */
+@RunWith(JUnit4.class)
+public class SpannerChangeStreamsSchemaTransformIT {
+
+  @ClassRule public static final IntegrationTestEnv ENV = new IntegrationTestEnv();
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+  private static String instanceId;
+  private static String projectId;
+  private static String databaseId;
+  private static String metadataTableName;
+  private static String changeStreamTableName;
+  private static String changeStreamName;
+  private static DatabaseClient databaseClient;
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    projectId = ENV.getProjectId();
+    instanceId = ENV.getInstanceId();
+    databaseId = ENV.getDatabaseId();
+    metadataTableName = ENV.getMetadataTableName();
+    changeStreamTableName = ENV.createSingersTable();
+    changeStreamName = ENV.createChangeStreamFor(changeStreamTableName);
+    System.out.println(changeStreamName);
+    databaseClient = ENV.getDatabaseClient();
+  }
+
+  @Before
+  public void before() {
+    pipeline.getOptions().as(ChangeStreamTestPipelineOptions.class).setStreaming(true);
+    pipeline.getOptions().as(ChangeStreamTestPipelineOptions.class).setBlockOnRun(false);
+  }
+
+  @Test
+  public void testReadSpannerChangeStream() {
+    // Defines how many rows are going to be inserted / updated / deleted in the test
+    final int numRows = 5;
+    // Inserts numRows rows and uses the first commit timestamp as the startAt for reading the
+    // change stream
+    final Pair<Timestamp, Timestamp> insertTimestamps = insertRows(numRows);
+    final Timestamp startAt = insertTimestamps.getLeft();
+    // Updates the created rows
+    updateRows(numRows);
+    // Delete the created rows and uses the last commit timestamp as the endAt for reading the
+    // change stream
+    final Pair<Timestamp, Timestamp> deleteTimestamps = deleteRows(numRows);
+    final Timestamp endAt = deleteTimestamps.getRight();
+
+    final PCollection<Row> tokens =
+        PCollectionRowTuple.empty(pipeline)
+            .apply(
+                new SpannerChangestreamsReadSchemaTransformProvider()
+                    .from(
+                        SpannerChangestreamsReadSchemaTransformProvider
+                            .SpannerChangestreamsReadConfiguration.builder()
+                            .setDatabaseId(databaseId)
+                            .setInstanceId(instanceId)
+                            .setProjectId(projectId)
+                            .setTable(changeStreamTableName)
+                            .setChangeStreamName(changeStreamName)
+                            .setStartAtTimestamp(startAt.toString())
+                            .setEndAtTimestamp(endAt.toString())
+                            .build())
+                    .buildTransform())
+            .get("output")
+            .apply(
+                Window.<Row>into(new GlobalWindows())
+                    .triggering(AfterWatermark.pastEndOfWindow())
+                    .discardingFiredPanes());
+
+    assertEquals(
+        Schema.builder()
+            .addStringField("operation")
+            .addStringField("commitTimestamp")
+            .addInt64Field("recordSequence")
+            .addRowField(
+                "rowValues",
+                Schema.builder()
+                    .addNullableField("singerid", Schema.FieldType.INT64)
+                    .addNullableField("firstname", Schema.FieldType.STRING)
+                    .addNullableField("lastname", Schema.FieldType.STRING)
+                    .addNullableField("singerinfo", Schema.FieldType.BYTES)
+                    .setOptions(
+                        Schema.Options.builder()
+                            .addOptions(
+                                Schema.Options.builder()
+                                    .setOption(
+                                        "primaryKeyColumns",
+                                        Schema.FieldType.array(Schema.FieldType.STRING),
+                                        Collections.singletonList("singerid"))
+                                    .build())
+                            .build())
+                    .build())
+            .build(),
+        tokens.getSchema());
+
+    PAssert.that(tokens.apply(Select.fieldNames("operation")))
+        .containsInAnyOrder(
+            operationRow("INSERT"),
+            operationRow("INSERT"),
+            operationRow("INSERT"),
+            operationRow("INSERT"),
+            operationRow("INSERT"),
+            operationRow("UPDATE"),
+            operationRow("UPDATE"),
+            operationRow("UPDATE"),
+            operationRow("UPDATE"),
+            operationRow("UPDATE"),
+            operationRow("DELETE"),
+            operationRow("DELETE"),
+            operationRow("DELETE"),
+            operationRow("DELETE"),
+            operationRow("DELETE"));
+    pipeline.run().waitUntilFinish();
+
+    assertMetadataTableHasBeenDropped();
+  }
+
+  private Row operationRow(String operation) {
+    return Row.withSchema(Schema.builder().addField("operation", Schema.FieldType.STRING).build())
+        .addValue(operation)
+        .build();
+  }
+
+  private static void assertMetadataTableHasBeenDropped() {
+    try (ResultSet resultSet =
+        databaseClient
+            .singleUse()
+            .executeQuery(Statement.of("SELECT * FROM " + metadataTableName))) {
+      resultSet.next();
+      fail(
+          "The metadata table "
+              + metadataTableName
+              + " should had been dropped, but it still exists");
+    } catch (SpannerException e) {
+      assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode());
+      assertTrue(
+          "Error message must contain \"Table not found\"",
+          e.getMessage().contains("Table not found"));
+    }
+  }
+
+  private static Pair<Timestamp, Timestamp> insertRows(int n) {
+    final Timestamp firstCommitTimestamp = insertRow(1);
+    for (int i = 2; i < n; i++) {
+      insertRow(i);
+    }
+    final Timestamp lastCommitTimestamp = insertRow(n);
+    return Pair.of(firstCommitTimestamp, lastCommitTimestamp);
+  }
+
+  private static Pair<Timestamp, Timestamp> updateRows(int n) {
+    final Timestamp firstCommitTimestamp = updateRow(1);
+    for (int i = 2; i < n; i++) {
+      updateRow(i);
+    }
+    final Timestamp lastCommitTimestamp = updateRow(n);
+    return Pair.of(firstCommitTimestamp, lastCommitTimestamp);
+  }
+
+  private static Pair<Timestamp, Timestamp> deleteRows(int n) {
+    final Timestamp firstCommitTimestamp = deleteRow(1);
+    for (int i = 2; i < n; i++) {
+      deleteRow(i);
+    }
+    final Timestamp lastCommitTimestamp = deleteRow(n);
+    return Pair.of(firstCommitTimestamp, lastCommitTimestamp);
+  }
+
+  private static Timestamp insertRow(int singerId) {
+    return databaseClient
+        .writeWithOptions(
+            Collections.singletonList(
+                Mutation.newInsertBuilder(changeStreamTableName)
+                    .set("SingerId")
+                    .to(singerId)
+                    .set("FirstName")
+                    .to("First Name " + singerId)
+                    .set("LastName")
+                    .to("Last Name " + singerId)
+                    .build()),
+            Options.tag("app=beam;action=insert"))
+        .getCommitTimestamp();
+  }
+
+  private static Timestamp updateRow(int singerId) {
+    return databaseClient
+        .writeWithOptions(
+            Collections.singletonList(
+                Mutation.newUpdateBuilder(changeStreamTableName)
+                    .set("SingerId")
+                    .to(singerId)
+                    .set("FirstName")
+                    .to("Updated First Name " + singerId)
+                    .set("LastName")
+                    .to("Updated Last Name " + singerId)
+                    .build()),
+            Options.tag("app=beam;action=update"))
+        .getCommitTimestamp();
+  }
+
+  private static Timestamp deleteRow(int singerId) {
+    return databaseClient
+        .writeWithOptions(
+            Collections.singletonList(Mutation.delete(changeStreamTableName, Key.of(singerId))),
+            Options.tag("app=beam;action=delete"))
+        .getCommitTimestamp();
+  }
+}