You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/24 21:13:02 UTC

[4/7] beam git commit: Initial implementation of SpannerIO.Write

Initial implementation of SpannerIO.Write

This closes #2166.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9cd6c3bd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9cd6c3bd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9cd6c3bd

Branch: refs/heads/master
Commit: 9cd6c3bdefd50473f9fc6fac359213dcd6b4e4d4
Parents: 0637df1
Author: MOLIG004 <gu...@disney.com>
Authored: Thu May 4 09:21:23 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed May 24 14:12:52 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/spanner/SpannerCSVLoader.java | 143 ++++++++++
 pom.xml                                         |   8 +
 sdks/java/io/google-cloud-platform/pom.xml      |  33 ++-
 .../beam/sdk/io/gcp/spanner/SpannerIO.java      | 272 +++++++++++++++++++
 .../beam/sdk/io/gcp/spanner/package-info.java   |  23 ++
 .../beam/sdk/io/gcp/GcpApiSurfaceTest.java      |   2 +
 6 files changed, 471 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9cd6c3bd/examples/java/src/main/java/org/apache/beam/examples/spanner/SpannerCSVLoader.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/spanner/SpannerCSVLoader.java b/examples/java/src/main/java/org/apache/beam/examples/spanner/SpannerCSVLoader.java
new file mode 100644
index 0000000..eee581d
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/spanner/SpannerCSVLoader.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.spanner;
+
+import com.google.cloud.spanner.Database;
+import com.google.cloud.spanner.DatabaseAdminClient;
+import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.Operation;
+import com.google.cloud.spanner.Spanner;
+import com.google.cloud.spanner.SpannerException;
+import com.google.cloud.spanner.SpannerOptions;
+import com.google.spanner.admin.database.v1.CreateDatabaseMetadata;
+import java.util.Collections;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+
+
+/**
+ * Generalized bulk loader for importing CSV files into Spanner.
+ *
+ */
+public class SpannerCSVLoader {
+
+    /**
+     * Command options specification.
+     */
+    private interface Options extends PipelineOptions {
+    @Description("Create a sample database")
+    @Default.Boolean(false)
+    boolean isCreateDatabase();
+    void setCreateDatabase(boolean createDatabase);
+
+    @Description("File to read from ")
+    @Validation.Required
+    String getInput();
+    void setInput(String value);
+
+    @Description("Instance ID to write to in Spanner")
+    @Validation.Required
+    String getInstanceId();
+    void setInstanceId(String value);
+
+    @Description("Database ID to write to in Spanner")
+    @Validation.Required
+    String getDatabaseId();
+    void setDatabaseId(String value);
+
+    @Description("Table name")
+    @Validation.Required
+    String getTable();
+    void setTable(String value);
+  }
+
+
+  /**
+   * Constructs and executes the processing pipeline based upon command options.
+   */
+  public static void main(String[] args) throws Exception {
+      Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+
+      Pipeline p = Pipeline.create(options);
+      PCollection<String> lines = p.apply(TextIO.Read.from(options.getInput()));
+      PCollection<Mutation> mutations = lines
+              .apply(ParDo.of(new NaiveParseCsvFn(options.getTable())));
+      mutations
+              .apply(SpannerIO.writeTo(options.getInstanceId(), options.getDatabaseId()));
+      p.run().waitUntilFinish();
+  }
+
+  public static void createDatabase(Options options) {
+      Spanner client = SpannerOptions.getDefaultInstance().getService();
+
+      DatabaseAdminClient databaseAdminClient = client.getDatabaseAdminClient();
+      try {
+          databaseAdminClient.dropDatabase(options.getInstanceId(), options
+                  .getDatabaseId());
+      } catch (SpannerException e) {
+          // Does not exist, ignore.
+      }
+      Operation<Database, CreateDatabaseMetadata> op = databaseAdminClient.createDatabase(
+               options.getInstanceId(), options
+              .getDatabaseId(), Collections.singleton("CREATE TABLE " + options.getTable() + " ("
+              + "  Key           INT64,"
+              + "  Name          STRING,"
+              + "  Email         STRING,"
+              + "  Age           INT,"
+              + ") PRIMARY KEY (Key)"));
+      op.waitFor();
+  }
+
+
+  /**
+   * A DoFn that creates a Spanner Mutation for each CSV line.
+   */
+  static class NaiveParseCsvFn extends DoFn<String, Mutation> {
+      private final String table;
+
+      NaiveParseCsvFn(String table) {
+          this.table = table;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+          String line = c.element();
+          String[] elements = line.split(",");
+          if (elements.length != 4) {
+              return;
+          }
+          Mutation mutation = Mutation.newInsertOrUpdateBuilder(table)
+                  .set("Key").to(Long.valueOf(elements[0]))
+                  .set("Name").to(elements[1])
+                  .set("Email").to(elements[2])
+                  .set("Age").to(Integer.valueOf(elements[3]))
+                  .build();
+          c.output(mutation);
+      }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/9cd6c3bd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index cc2e483..3a6289d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,6 +104,7 @@
     <apache.commons.lang.version>3.5</apache.commons.lang.version>
     <apache.commons.compress.version>1.9</apache.commons.compress.version>
     <apex.kryo.version>2.24.0</apex.kryo.version>
+    <api-common.version>1.0.0-rc2</api-common.version>
     <avro.version>1.8.1</avro.version>
     <bigquery.version>v2-rev295-1.22.0</bigquery.version>
     <bigtable.version>0.9.6.2</bigtable.version>
@@ -139,6 +140,7 @@
     <stax2.version>3.1.4</stax2.version>
     <storage.version>v1-rev71-1.22.0</storage.version>
     <woodstox.version>4.4.1</woodstox.version>
+    <spanner.version>0.16.0-beta</spanner.version>
     <spring.version>4.3.5.RELEASE</spring.version>
     <groovy-maven-plugin.version>2.0</groovy-maven-plugin.version>
     <snappy-java.version>1.1.4-M3</snappy-java.version>
@@ -866,6 +868,12 @@
       </dependency>
 
       <dependency>
+        <groupId>com.google.cloud</groupId>
+        <artifactId>google-cloud-spanner</artifactId>
+        <version>${spanner.version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>com.google.cloud.bigdataoss</groupId>
         <artifactId>util</artifactId>
         <version>${google-cloud-bigdataoss.version}</version>

http://git-wip-us.apache.org/repos/asf/beam/blob/9cd6c3bd/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index ea2d8f0..2181895 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -82,11 +82,28 @@
     </dependency>
 
     <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.api.grpc</groupId>
+      <artifactId>grpc-google-common-protos</artifactId>
+      <version>${grpc-google-common-protos.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.apis</groupId>
       <artifactId>google-api-services-bigquery</artifactId>
     </dependency>
 
     <dependency>
+      <groupId>com.google.api</groupId>
+      <artifactId>api-common</artifactId>
+      <version>${api-common.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.apis</groupId>
       <artifactId>google-api-services-pubsub</artifactId>
     </dependency>
@@ -118,11 +135,6 @@
 
     <dependency>
       <groupId>io.grpc</groupId>
-      <artifactId>grpc-core</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>io.grpc</groupId>
       <artifactId>grpc-netty</artifactId>
     </dependency>
 
@@ -150,6 +162,12 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.cloud</groupId>
+      <artifactId>google-cloud-spanner</artifactId>
+      <version>${spanner.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.cloud.bigtable</groupId>
       <artifactId>bigtable-protos</artifactId>
     </dependency>
@@ -185,11 +203,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.google.api.grpc</groupId>
-      <artifactId>grpc-google-common-protos</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/9cd6c3bd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
new file mode 100644
index 0000000..172ed8f
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+
+import com.google.cloud.spanner.AbortedException;
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.DatabaseId;
+import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.Spanner;
+import com.google.cloud.spanner.SpannerOptions;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+
+/**
+ *  <a href="https://cloud.google.com/spanner/">Google Cloud Spanner</a> connectors.
+ *
+ * <h3>Reading from Cloud Spanner</h3>
+ * <strong>Status: Not implemented.</strong>
+ *
+ * <h3>Writing to Cloud Spanner</h3>
+ * <strong>Status: Experimental.</strong>
+ *
+ * <p>{@link SpannerIO#writeTo} batches together and concurrently writes a set of {@link Mutation}s.
+ * To configure Cloud Spanner sink, you must apply {@link SpannerIO#writeTo} transform to
+ * {@link PCollection} and specify instance and database identifiers.
+ * For example, following code sketches out a pipeline that imports data from the CSV file to Cloud
+ * Spanner.
+ *
+ * <pre>{@code
+ *
+ * Pipeline p = ...;
+ * // Read the CSV file.
+ * PCollection<String> lines = p.apply("Read CSV file", TextIO.Read.from(options.getInput()));
+ * // Parse the line and convert to mutation.
+ * PCollection<Mutation> mutations = lines.apply("Parse CSV", parseFromCsv());
+ * // Write mutations.
+ * mutations.apply("Write", SpannerIO.writeTo(options.getInstanceId(), options.getDatabaseId()));
+ * p.run();
+ *
+ * }</pre>
+
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class SpannerIO {
+
+  private SpannerIO() {
+  }
+
+  @VisibleForTesting
+  static final int SPANNER_MUTATIONS_PER_COMMIT_LIMIT = 20000;
+
+    /**
+     * Creates an instance of {@link Writer}. Use {@link Writer#withBatchSize} to limit the batch
+     * size.
+     */
+  public static Writer writeTo(String instanceId, String databaseId) {
+    return new Writer(instanceId, databaseId, SPANNER_MUTATIONS_PER_COMMIT_LIMIT);
+  }
+
+  /**
+   * A {@link PTransform} that writes {@link Mutation} objects to Cloud Spanner.
+   *
+   * @see SpannerIO
+   */
+  public static class Writer extends PTransform<PCollection<Mutation>, PDone> {
+
+    private final String instanceId;
+    private final String databaseId;
+    private int batchSize;
+
+    Writer(String instanceId, String databaseId, int batchSize) {
+      this.instanceId = instanceId;
+      this.databaseId = databaseId;
+      this.batchSize = batchSize;
+    }
+
+    /**
+     * Returns a new {@link Writer} with a limit on the number of mutations per batch.
+     * Defaults to {@link SpannerIO#SPANNER_MUTATIONS_PER_COMMIT_LIMIT}.
+     */
+    public Writer withBatchSize(Integer batchSize) {
+      return new Writer(instanceId, databaseId, batchSize);
+    }
+
+    @Override
+    public PDone expand(PCollection<Mutation> input) {
+      input.apply("Write mutations to Spanner", ParDo.of(
+              new SpannerWriterFn(instanceId, databaseId, batchSize)));
+
+      return PDone.in(input.getPipeline());
+    }
+
+    @Override
+    public void validate(PCollection<Mutation> input) {
+      checkNotNull(instanceId, "instanceId");
+      checkNotNull(databaseId, "databaseId");
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(getClass())
+          .add("instanceId", instanceId)
+          .add("databaseId", databaseId)
+          .toString();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(DisplayData.item("instanceId", instanceId)
+              .withLabel("Output Instance"))
+          .addIfNotNull(DisplayData.item("databaseId", databaseId)
+              .withLabel("Output Database"));
+    }
+
+  }
+
+
+  /**
+   * {@link DoFn} that writes {@link Mutation}s to Cloud Spanner. Mutations are written in
+   * batches, where the maximum batch size is {@link SpannerIO#SPANNER_MUTATIONS_PER_COMMIT_LIMIT}.
+   *
+   * <p>See <a href="https://cloud.google.com/spanner"/>
+   *
+   * <p>Commits are non-transactional.  If a commit fails, it will be retried (up to
+   * {@link SpannerIO#MAX_RETRIES}. times). This means that the
+   * mutation operation should be idempotent.
+   */
+  @VisibleForTesting
+  static class SpannerWriterFn extends DoFn<Mutation, Void> {
+    private static final Logger LOG = LoggerFactory.getLogger(SpannerWriterFn.class);
+    private transient Spanner spanner;
+    private final String instanceId;
+    private final String databaseId;
+    private final int batchSize;
+    private transient DatabaseClient dbClient;
+    // Current batch of mutations to be written.
+    private final List<Mutation> mutations = new ArrayList<>();
+
+    private static final int MAX_RETRIES = 5;
+    private static final FluentBackoff BUNDLE_WRITE_BACKOFF =
+        FluentBackoff.DEFAULT
+            .withMaxRetries(MAX_RETRIES).withInitialBackoff(Duration.standardSeconds(5));
+
+    @VisibleForTesting
+    SpannerWriterFn(String instanceId, String databaseId, int batchSize) {
+      this.instanceId = checkNotNull(instanceId, "instanceId");
+      this.databaseId = checkNotNull(databaseId, "databaseId");
+      this.batchSize = batchSize;
+    }
+
+    @Setup
+    public void setup() throws Exception {
+        SpannerOptions options = SpannerOptions.newBuilder().build();
+        spanner = options.getService();
+        dbClient = spanner.getDatabaseClient(
+            DatabaseId.of(options.getProjectId(), instanceId, databaseId));
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      Mutation m = c.element();
+      mutations.add(m);
+      int columnCount = m.asMap().size();
+      if ((mutations.size() + 1) * columnCount >= batchSize) {
+        flushBatch();
+      }
+    }
+
+    @FinishBundle
+    public void finishBundle(Context c) throws Exception {
+      if (!mutations.isEmpty()) {
+        flushBatch();
+      }
+    }
+
+    @Teardown
+    public void teardown() throws Exception {
+      if (spanner == null) {
+          return;
+      }
+      spanner.closeAsync().get();
+    }
+
+    /**
+     * Writes a batch of mutations to Cloud Spanner.
+     *
+     * <p>If a commit fails, it will be retried up to {@link #MAX_RETRIES} times.
+     * If the retry limit is exceeded, the last exception from Cloud Spanner will be
+     * thrown.
+     *
+     * @throws AbortedException if the commit fails or IOException or InterruptedException if
+     * backing off between retries fails.
+     */
+    private void flushBatch() throws AbortedException, IOException, InterruptedException {
+      LOG.debug("Writing batch of {} mutations", mutations.size());
+      Sleeper sleeper = Sleeper.DEFAULT;
+      BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff();
+
+      while (true) {
+        // Batch upsert rows.
+        try {
+          dbClient.writeAtLeastOnce(mutations);
+
+          // Break if the commit threw no exception.
+          break;
+        } catch (AbortedException exception) {
+          // Only log the code and message for potentially-transient errors. The entire exception
+          // will be propagated upon the last retry.
+          LOG.error("Error writing to Spanner ({}): {}", exception.getCode(),
+              exception.getMessage());
+          if (!BackOffUtils.next(sleeper, backoff)) {
+            LOG.error("Aborting after {} retries.", MAX_RETRIES);
+            throw exception;
+          }
+        }
+      }
+      LOG.debug("Successfully wrote {} mutations", mutations.size());
+      mutations.clear();
+    }
+
+    @Override
+    public void populateDisplayData(Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(DisplayData.item("instanceId", instanceId)
+              .withLabel("Instance"))
+          .addIfNotNull(DisplayData.item("databaseId", databaseId)
+              .withLabel("Database"));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/9cd6c3bd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/package-info.java
new file mode 100644
index 0000000..19e468c
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <p>Provides an API for reading from and writing to
+ * <a href="https://developers.google.com/spanner/">Google Cloud Spanner</a>.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;

http://git-wip-us.apache.org/repos/asf/beam/blob/9cd6c3bd/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
index 7025004..8950452 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
@@ -75,6 +75,8 @@ public class GcpApiSurfaceTest {
             classesInPackage("javax"),
             classesInPackage("org.apache.beam"),
             classesInPackage("org.apache.commons.logging"),
+            classesInPackage("com.google.cloud"),
+            classesInPackage("com.google.cloud.spanner"),
             // via Bigtable
             classesInPackage("org.joda.time"));