You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 17:09:36 UTC

[09/28] beam git commit: Revert "[BEAM-2610] This closes #3553"

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
index 8aac417..91caded 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
@@ -52,7 +52,6 @@ public class GcpApiSurfaceTest {
     @SuppressWarnings("unchecked")
     final Set<Matcher<Class<?>>> allowedClasses =
         ImmutableSet.of(
-            classesInPackage("com.google.api.core"),
             classesInPackage("com.google.api.client.googleapis"),
             classesInPackage("com.google.api.client.http"),
             classesInPackage("com.google.api.client.json"),
@@ -61,18 +60,9 @@ public class GcpApiSurfaceTest {
             classesInPackage("com.google.auth"),
             classesInPackage("com.google.bigtable.v2"),
             classesInPackage("com.google.cloud.bigtable.config"),
-            classesInPackage("com.google.spanner.v1"),
-            Matchers.<Class<?>>equalTo(com.google.api.gax.grpc.ApiException.class),
             Matchers.<Class<?>>equalTo(com.google.cloud.bigtable.grpc.BigtableClusterName.class),
             Matchers.<Class<?>>equalTo(com.google.cloud.bigtable.grpc.BigtableInstanceName.class),
             Matchers.<Class<?>>equalTo(com.google.cloud.bigtable.grpc.BigtableTableName.class),
-            Matchers.<Class<?>>equalTo(com.google.cloud.BaseServiceException.class),
-            Matchers.<Class<?>>equalTo(com.google.cloud.BaseServiceException.Error.class),
-            Matchers.<Class<?>>equalTo(com.google.cloud.BaseServiceException.ExceptionData.class),
-            Matchers.<Class<?>>equalTo(com.google.cloud.BaseServiceException.ExceptionData.Builder
-                .class),
-            Matchers.<Class<?>>equalTo(com.google.cloud.RetryHelper.RetryHelperException.class),
-            Matchers.<Class<?>>equalTo(com.google.cloud.grpc.BaseGrpcServiceException.class),
             Matchers.<Class<?>>equalTo(com.google.cloud.ByteArray.class),
             Matchers.<Class<?>>equalTo(com.google.cloud.Date.class),
             Matchers.<Class<?>>equalTo(com.google.cloud.Timestamp.class),

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index d31f3a0..bfd260a 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -82,7 +82,6 @@ import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -132,7 +131,6 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PCollectionViews;
-import org.apache.beam.sdk.values.ShardedKey;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.ValueInSingleWindow;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
index 91f0bae..a064bd6 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.gcp.bigtable;
 import com.google.bigtable.v2.Row;
 import com.google.cloud.bigtable.config.BigtableOptions;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -42,10 +41,8 @@ public class BigtableReadIT {
     BigtableTestOptions options = TestPipeline.testingPipelineOptions()
         .as(BigtableTestOptions.class);
 
-    String project = options.as(GcpOptions.class).getProject();
-
     BigtableOptions.Builder bigtableOptionsBuilder = new BigtableOptions.Builder()
-        .setProjectId(project)
+        .setProjectId(options.getProjectId())
         .setInstanceId(options.getInstanceId());
 
     final String tableId = "BigtableReadTest";

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java
index 03cb697..0ab7576 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java
@@ -25,6 +25,11 @@ import org.apache.beam.sdk.testing.TestPipelineOptions;
  * Properties needed when using Bigtable with the Beam SDK.
  */
 public interface BigtableTestOptions extends TestPipelineOptions {
+  @Description("Project ID for Bigtable")
+  @Default.String("apache-beam-testing")
+  String getProjectId();
+  void setProjectId(String value);
+
   @Description("Instance ID for Bigtable")
   @Default.String("beam-test")
   String getInstanceId();

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
index 010bcc4..1d168f1 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
@@ -73,17 +73,15 @@ public class BigtableWriteIT implements Serializable {
   private static BigtableTableAdminClient tableAdminClient;
   private final String tableId =
       String.format("BigtableWriteIT-%tF-%<tH-%<tM-%<tS-%<tL", new Date());
-  private String project;
 
   @Before
   public void setup() throws Exception {
     PipelineOptionsFactory.register(BigtableTestOptions.class);
     options = TestPipeline.testingPipelineOptions().as(BigtableTestOptions.class);
-    project = options.as(GcpOptions.class).getProject();
 
     bigtableOptions =
         new Builder()
-            .setProjectId(project)
+            .setProjectId(options.getProjectId())
             .setInstanceId(options.getInstanceId())
             .setUserAgent("apache-beam-test")
             .build();

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottlerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottlerTest.java
deleted file mode 100644
index c12cf55..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottlerTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.datastore;
-
-import static org.hamcrest.Matchers.closeTo;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-
-import java.util.Random;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
-
-/**
- * Tests for {@link AdaptiveThrottler}.
- */
-@RunWith(JUnit4.class)
-public class AdaptiveThrottlerTest {
-
-  static final long START_TIME_MS = 0;
-  static final long SAMPLE_PERIOD_MS = 60000;
-  static final long SAMPLE_BUCKET_MS = 1000;
-  static final double OVERLOAD_RATIO = 2;
-
-  /** Returns a throttler configured with the standard parameters above. */
-  AdaptiveThrottler getThrottler() {
-    return new AdaptiveThrottler(SAMPLE_PERIOD_MS, SAMPLE_BUCKET_MS, OVERLOAD_RATIO);
-  }
-
-  @Test
-  public void testNoInitialThrottling() throws Exception {
-    AdaptiveThrottler throttler = getThrottler();
-    assertThat(throttler.throttlingProbability(START_TIME_MS), equalTo(0.0));
-    assertThat("first request is not throttled",
-        throttler.throttleRequest(START_TIME_MS), equalTo(false));
-  }
-
-  @Test
-  public void testNoThrottlingIfNoErrors() throws Exception {
-    AdaptiveThrottler throttler = getThrottler();
-    long t = START_TIME_MS;
-    for (; t < START_TIME_MS + 20; t++) {
-      assertFalse(throttler.throttleRequest(t));
-      throttler.successfulRequest(t);
-    }
-    assertThat(throttler.throttlingProbability(t), equalTo(0.0));
-  }
-
-  @Test
-  public void testNoThrottlingAfterErrorsExpire() throws Exception {
-    AdaptiveThrottler throttler = getThrottler();
-    long t = START_TIME_MS;
-    for (; t < START_TIME_MS + SAMPLE_PERIOD_MS; t++) {
-      throttler.throttleRequest(t);
-      // and no successfulRequest.
-    }
-    assertThat("check that we set up a non-zero probability of throttling",
-        throttler.throttlingProbability(t), greaterThan(0.0));
-    for (; t < START_TIME_MS + 2 * SAMPLE_PERIOD_MS; t++) {
-      throttler.throttleRequest(t);
-      throttler.successfulRequest(t);
-    }
-    assertThat(throttler.throttlingProbability(t), equalTo(0.0));
-  }
-
-  @Test
-  public void testThrottlingAfterErrors() throws Exception {
-    Random mockRandom = Mockito.mock(Random.class);
-    Mockito.when(mockRandom.nextDouble()).thenReturn(
-        0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9,
-        0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9);
-    AdaptiveThrottler throttler = new AdaptiveThrottler(
-        SAMPLE_PERIOD_MS, SAMPLE_BUCKET_MS, OVERLOAD_RATIO, mockRandom);
-    for (int i = 0; i < 20; i++) {
-      boolean throttled = throttler.throttleRequest(START_TIME_MS + i);
-      // 1/3rd of requests succeeding.
-      if (i % 3 == 1) {
-        throttler.successfulRequest(START_TIME_MS + i);
-      }
-
-      // Once we have some history in place, check what throttling happens.
-      if (i >= 10) {
-        // Expect 1/3rd of requests to be throttled. (So 1/3rd throttled, 1/3rd succeeding, 1/3rd
-        // tried and failing).
-        assertThat(String.format("for i=%d", i),
-            throttler.throttlingProbability(START_TIME_MS + i), closeTo(0.33, /*error=*/ 0.1));
-        // Requests 10..13 should be throttled, 14..19 not throttled given the mocked random numbers
-        // that we fed to throttler.
-        assertThat(String.format("for i=%d", i), throttled, equalTo(i < 14));
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
index a3f5d38..460049e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -27,7 +27,7 @@ import static com.google.datastore.v1.client.DatastoreHelper.makeOrder;
 import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert;
 import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
 import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DATASTORE_BATCH_UPDATE_BYTES_LIMIT;
-import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START;
+import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DATASTORE_BATCH_UPDATE_LIMIT;
 import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.DEFAULT_BUNDLE_SIZE_BYTES;
 import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.QUERY_BATCH_LIMIT;
 import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.getEstimatedSizeBytes;
@@ -606,7 +606,7 @@ public class DatastoreV1Test {
   /** Tests {@link DatastoreWriterFn} with entities of more than one batches, but not a multiple. */
   @Test
   public void testDatatoreWriterFnWithMultipleBatches() throws Exception {
-    datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_ENTITIES_START * 3 + 100);
+    datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 3 + 100);
   }
 
   /**
@@ -615,7 +615,7 @@ public class DatastoreV1Test {
    */
   @Test
   public void testDatatoreWriterFnWithBatchesExactMultiple() throws Exception {
-    datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_ENTITIES_START * 2);
+    datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 2);
   }
 
   // A helper method to test DatastoreWriterFn for various batch sizes.
@@ -628,14 +628,14 @@ public class DatastoreV1Test {
     }
 
     DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID),
-        null, mockDatastoreFactory, new FakeWriteBatcher());
+        null, mockDatastoreFactory);
     DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
     doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
     doFnTester.processBundle(mutations);
 
     int start = 0;
     while (start < numMutations) {
-      int end = Math.min(numMutations, start + DATASTORE_BATCH_UPDATE_ENTITIES_START);
+      int end = Math.min(numMutations, start + DATASTORE_BATCH_UPDATE_LIMIT);
       CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
       commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
       commitRequest.addAllMutations(mutations.subList(start, end));
@@ -651,28 +651,26 @@ public class DatastoreV1Test {
   @Test
   public void testDatatoreWriterFnWithLargeEntities() throws Exception {
     List<Mutation> mutations = new ArrayList<>();
-    int entitySize = 0;
+    int propertySize = 900_000;
     for (int i = 0; i < 12; ++i) {
-      Entity entity = Entity.newBuilder().setKey(makeKey("key" + i, i + 1))
-        .putProperties("long", makeValue(new String(new char[900_000])
-              ).setExcludeFromIndexes(true).build())
-        .build();
-      entitySize = entity.getSerializedSize(); // Take the size of any one entity.
-      mutations.add(makeUpsert(entity).build());
+      Entity.Builder entity = Entity.newBuilder().setKey(makeKey("key" + i, i + 1));
+      entity.putProperties("long", makeValue(new String(new char[propertySize])
+            ).setExcludeFromIndexes(true).build());
+      mutations.add(makeUpsert(entity.build()).build());
     }
 
     DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID),
-        null, mockDatastoreFactory, new FakeWriteBatcher());
+        null, mockDatastoreFactory);
     DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
     doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
     doFnTester.processBundle(mutations);
 
     // This test is over-specific currently; it requires that we split the 12 entity writes into 3
     // requests, but we only need each CommitRequest to be less than 10MB in size.
-    int entitiesPerRpc = DATASTORE_BATCH_UPDATE_BYTES_LIMIT / entitySize;
+    int propertiesPerRpc = DATASTORE_BATCH_UPDATE_BYTES_LIMIT / propertySize;
     int start = 0;
     while (start < mutations.size()) {
-      int end = Math.min(mutations.size(), start + entitiesPerRpc);
+      int end = Math.min(mutations.size(), start + propertiesPerRpc);
       CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
       commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
       commitRequest.addAllMutations(mutations.subList(start, end));
@@ -783,7 +781,7 @@ public class DatastoreV1Test {
    */
   @Test
   public void testSplitQueryFnWithQueryLimit() throws Exception {
-    Query queryWithLimit = QUERY.toBuilder()
+    Query queryWithLimit = QUERY.toBuilder().clone()
         .setLimit(Int32Value.newBuilder().setValue(1))
         .build();
 
@@ -896,50 +894,6 @@ public class DatastoreV1Test {
         .apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject()));
   }
 
-  @Test
-  public void testWriteBatcherWithoutData() {
-    DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
-    writeBatcher.start();
-    assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START, writeBatcher.nextBatchSize(0));
-  }
-
-  @Test
-  public void testWriteBatcherFastQueries() {
-    DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
-    writeBatcher.start();
-    writeBatcher.addRequestLatency(0, 1000, 200);
-    writeBatcher.addRequestLatency(0, 1000, 200);
-    assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_LIMIT, writeBatcher.nextBatchSize(0));
-  }
-
-  @Test
-  public void testWriteBatcherSlowQueries() {
-    DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
-    writeBatcher.start();
-    writeBatcher.addRequestLatency(0, 10000, 200);
-    writeBatcher.addRequestLatency(0, 10000, 200);
-    assertEquals(100, writeBatcher.nextBatchSize(0));
-  }
-
-  @Test
-  public void testWriteBatcherSizeNotBelowMinimum() {
-    DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
-    writeBatcher.start();
-    writeBatcher.addRequestLatency(0, 30000, 50);
-    writeBatcher.addRequestLatency(0, 30000, 50);
-    assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_MIN, writeBatcher.nextBatchSize(0));
-  }
-
-  @Test
-  public void testWriteBatcherSlidingWindow() {
-    DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
-    writeBatcher.start();
-    writeBatcher.addRequestLatency(0, 30000, 50);
-    writeBatcher.addRequestLatency(50000, 5000, 200);
-    writeBatcher.addRequestLatency(100000, 5000, 200);
-    assertEquals(200, writeBatcher.nextBatchSize(150000));
-  }
-
   /** Helper Methods */
 
   /** A helper function that verifies if all the queries have unique keys. */
@@ -1079,24 +1033,8 @@ public class DatastoreV1Test {
   private List<Query> splitQuery(Query query, int numSplits) {
     List<Query> queries = new LinkedList<>();
     for (int i = 0; i < numSplits; i++) {
-      queries.add(query.toBuilder().build());
+      queries.add(query.toBuilder().clone().build());
     }
     return queries;
   }
-
-  /**
-   * A WriteBatcher for unit tests, which does no timing-based adjustments (so unit tests have
-   * consistent results).
-   */
-  static class FakeWriteBatcher implements DatastoreV1.WriteBatcher {
-    @Override
-    public void start() {}
-    @Override
-    public void addRequestLatency(long timeSinceEpochMillis, long latencyMillis, int numMutations) {
-    }
-    @Override
-    public int nextBatchSize(long timeSinceEpochMillis) {
-      return DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
index cd61229..5e618df 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
@@ -374,7 +374,7 @@ class V1TestUtil {
 
     // Read the next batch of query results.
     private Iterator<EntityResult> getIteratorAndMoveCursor() throws DatastoreException {
-      Query.Builder query = this.query.toBuilder();
+      Query.Builder query = this.query.toBuilder().clone();
       query.setLimit(Int32Value.newBuilder().setValue(QUERY_BATCH_LIMIT));
       if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
         query.setStartCursor(currentBatch.getEndCursor());

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/FakeServiceFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/FakeServiceFactory.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/FakeServiceFactory.java
deleted file mode 100644
index 753d807..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/FakeServiceFactory.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.withSettings;
-
-import com.google.cloud.ServiceFactory;
-import com.google.cloud.spanner.DatabaseClient;
-import com.google.cloud.spanner.DatabaseId;
-import com.google.cloud.spanner.Spanner;
-import com.google.cloud.spanner.SpannerOptions;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import javax.annotation.concurrent.GuardedBy;
-import org.mockito.Matchers;
-
-/**
- * A serialization friendly type service factory that maintains a mock {@link Spanner} and
- * {@link DatabaseClient}.
- * */
-class FakeServiceFactory
-    implements ServiceFactory<Spanner, SpannerOptions>, Serializable {
-
-  // Marked as static so they could be returned by serviceFactory, which is serializable.
-  private static final Object lock = new Object();
-
-  @GuardedBy("lock")
-  private static final List<Spanner> mockSpanners = new ArrayList<>();
-
-  @GuardedBy("lock")
-  private static final List<DatabaseClient> mockDatabaseClients = new ArrayList<>();
-
-  @GuardedBy("lock")
-  private static int count = 0;
-
-  private final int index;
-
-  public FakeServiceFactory() {
-    synchronized (lock) {
-      index = count++;
-      mockSpanners.add(mock(Spanner.class, withSettings().serializable()));
-      mockDatabaseClients.add(mock(DatabaseClient.class, withSettings().serializable()));
-    }
-    when(mockSpanner().getDatabaseClient(Matchers.any(DatabaseId.class)))
-        .thenReturn(mockDatabaseClient());
-  }
-
-  DatabaseClient mockDatabaseClient() {
-    synchronized (lock) {
-      return mockDatabaseClients.get(index);
-    }
-  }
-
-  Spanner mockSpanner() {
-    synchronized (lock) {
-      return mockSpanners.get(index);
-    }
-  }
-
-  @Override
-  public Spanner create(SpannerOptions serviceOptions) {
-    return mockSpanner();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java
index 013b83d..03eb28e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java
@@ -135,16 +135,4 @@ public class MutationSizeEstimatorTest {
     assertThat(MutationSizeEstimator.sizeOf(timestampArray), is(24L));
     assertThat(MutationSizeEstimator.sizeOf(dateArray), is(48L));
   }
-
-  @Test
-  public void group() throws Exception {
-    Mutation int64 = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build();
-    Mutation float64 = Mutation.newInsertOrUpdateBuilder("test").set("one").to(2.9).build();
-    Mutation bool = Mutation.newInsertOrUpdateBuilder("test").set("one").to(false).build();
-
-    MutationGroup group = MutationGroup.create(int64, float64, bool);
-
-    assertThat(MutationSizeEstimator.sizeOf(group), is(17L));
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/RandomUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/RandomUtils.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/RandomUtils.java
deleted file mode 100644
index f479b4a..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/RandomUtils.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner;
-
-import java.util.Random;
-
-/**
- * Useful randomness related utilities.
- */
-public class RandomUtils {
-
-  private static final char[] ALPHANUMERIC = "1234567890abcdefghijklmnopqrstuvwxyz".toCharArray();
-
-  private RandomUtils() {
-  }
-
-  public static String randomAlphaNumeric(int length) {
-    Random random = new Random();
-    char[] result = new char[length];
-    for (int i = 0; i < length; i++) {
-      result[i] = ALPHANUMERIC[random.nextInt(ALPHANUMERIC.length)];
-    }
-    return new String(result);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java
deleted file mode 100644
index 5ba2da0..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner;
-
-import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.google.cloud.Timestamp;
-import com.google.cloud.spanner.KeySet;
-import com.google.cloud.spanner.ReadOnlyTransaction;
-import com.google.cloud.spanner.ResultSets;
-import com.google.cloud.spanner.Statement;
-import com.google.cloud.spanner.Struct;
-import com.google.cloud.spanner.TimestampBound;
-import com.google.cloud.spanner.Type;
-import com.google.cloud.spanner.Value;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.hamcrest.Matchers;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
-
-/** Unit tests for {@link SpannerIO}. */
-@RunWith(JUnit4.class)
-public class SpannerIOReadTest implements Serializable {
-  @Rule
-  public final transient TestPipeline pipeline = TestPipeline.create();
-  @Rule
-  public final transient ExpectedException thrown = ExpectedException.none();
-
-  private FakeServiceFactory serviceFactory;
-  private ReadOnlyTransaction mockTx;
-
-  private Type fakeType = Type.struct(Type.StructField.of("id", Type.int64()),
-      Type.StructField.of("name", Type.string()));
-
-  private List<Struct> fakeRows = Arrays.asList(
-      Struct.newBuilder().add("id", Value.int64(1)).add("name", Value.string("Alice")).build(),
-      Struct.newBuilder().add("id", Value.int64(2)).add("name", Value.string("Bob")).build());
-
-  @Before
-  @SuppressWarnings("unchecked")
-  public void setUp() throws Exception {
-    serviceFactory = new FakeServiceFactory();
-    mockTx = Mockito.mock(ReadOnlyTransaction.class);
-  }
-
-  @Test
-  public void emptyTransform() throws Exception {
-    SpannerIO.Read read = SpannerIO.read();
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("requires instance id to be set with");
-    read.validate(null);
-  }
-
-  @Test
-  public void emptyInstanceId() throws Exception {
-    SpannerIO.Read read = SpannerIO.read().withDatabaseId("123");
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("requires instance id to be set with");
-    read.validate(null);
-  }
-
-  @Test
-  public void emptyDatabaseId() throws Exception {
-    SpannerIO.Read read = SpannerIO.read().withInstanceId("123");
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("requires database id to be set with");
-    read.validate(null);
-  }
-
-  @Test
-  public void emptyQuery() throws Exception {
-    SpannerIO.Read read =
-        SpannerIO.read().withInstanceId("123").withDatabaseId("aaa").withTimestamp(Timestamp.now());
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("requires configuring query or read operation");
-    read.validate(null);
-  }
-
-  @Test
-  public void emptyColumns() throws Exception {
-    SpannerIO.Read read =
-        SpannerIO.read()
-            .withInstanceId("123")
-            .withDatabaseId("aaa")
-            .withTimestamp(Timestamp.now())
-            .withTable("users");
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("requires a list of columns");
-    read.validate(null);
-  }
-
-  @Test
-  public void validRead() throws Exception {
-    SpannerIO.Read read =
-        SpannerIO.read()
-            .withInstanceId("123")
-            .withDatabaseId("aaa")
-            .withTimestamp(Timestamp.now())
-            .withTable("users")
-            .withColumns("id", "name", "email");
-    read.validate(null);
-  }
-
-  @Test
-  public void validQuery() throws Exception {
-    SpannerIO.Read read =
-        SpannerIO.read()
-            .withInstanceId("123")
-            .withDatabaseId("aaa")
-            .withTimestamp(Timestamp.now())
-            .withQuery("SELECT * FROM users");
-    read.validate(null);
-  }
-
-  @Test
-  public void runQuery() throws Exception {
-    SpannerIO.Read read =
-        SpannerIO.read()
-            .withProjectId("test")
-            .withInstanceId("123")
-            .withDatabaseId("aaa")
-            .withTimestamp(Timestamp.now())
-            .withQuery("SELECT * FROM users")
-            .withServiceFactory(serviceFactory);
-
-    NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read);
-    DoFnTester<Object, Struct> fnTester = DoFnTester.of(readFn);
-
-    when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class)))
-        .thenReturn(mockTx);
-    when(mockTx.executeQuery(any(Statement.class)))
-        .thenReturn(ResultSets.forRows(fakeType, fakeRows));
-
-    List<Struct> result = fnTester.processBundle(1);
-    assertThat(result, Matchers.<Struct>iterableWithSize(2));
-
-    verify(serviceFactory.mockDatabaseClient()).readOnlyTransaction(TimestampBound
-        .strong());
-    verify(mockTx).executeQuery(Statement.of("SELECT * FROM users"));
-  }
-
-  @Test
-  public void runRead() throws Exception {
-    SpannerIO.Read read =
-        SpannerIO.read()
-            .withProjectId("test")
-            .withInstanceId("123")
-            .withDatabaseId("aaa")
-            .withTimestamp(Timestamp.now())
-            .withTable("users")
-            .withColumns("id", "name")
-            .withServiceFactory(serviceFactory);
-
-    NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read);
-    DoFnTester<Object, Struct> fnTester = DoFnTester.of(readFn);
-
-    when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class)))
-        .thenReturn(mockTx);
-    when(mockTx.read("users", KeySet.all(), Arrays.asList("id", "name")))
-        .thenReturn(ResultSets.forRows(fakeType, fakeRows));
-
-    List<Struct> result = fnTester.processBundle(1);
-    assertThat(result, Matchers.<Struct>iterableWithSize(2));
-
-    verify(serviceFactory.mockDatabaseClient()).readOnlyTransaction(TimestampBound.strong());
-    verify(mockTx).read("users", KeySet.all(), Arrays.asList("id", "name"));
-  }
-
-  @Test
-  public void runReadUsingIndex() throws Exception {
-    SpannerIO.Read read =
-        SpannerIO.read()
-            .withProjectId("test")
-            .withInstanceId("123")
-            .withDatabaseId("aaa")
-            .withTimestamp(Timestamp.now())
-            .withTable("users")
-            .withColumns("id", "name")
-            .withIndex("theindex")
-            .withServiceFactory(serviceFactory);
-
-    NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read);
-    DoFnTester<Object, Struct> fnTester = DoFnTester.of(readFn);
-
-    when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class)))
-        .thenReturn(mockTx);
-    when(mockTx.readUsingIndex("users", "theindex", KeySet.all(), Arrays.asList("id", "name")))
-        .thenReturn(ResultSets.forRows(fakeType, fakeRows));
-
-    List<Struct> result = fnTester.processBundle(1);
-    assertThat(result, Matchers.<Struct>iterableWithSize(2));
-
-    verify(serviceFactory.mockDatabaseClient()).readOnlyTransaction(TimestampBound.strong());
-    verify(mockTx).readUsingIndex("users", "theindex", KeySet.all(), Arrays.asList("id", "name"));
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void readPipeline() throws Exception {
-    Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345);
-
-    PCollectionView<Transaction> tx = pipeline
-        .apply("tx", SpannerIO.createTransaction()
-            .withProjectId("test")
-            .withInstanceId("123")
-            .withDatabaseId("aaa")
-            .withServiceFactory(serviceFactory));
-
-    PCollection<Struct> one = pipeline.apply("read q", SpannerIO.read()
-        .withProjectId("test")
-        .withInstanceId("123")
-        .withDatabaseId("aaa")
-        .withTimestamp(Timestamp.now())
-        .withQuery("SELECT * FROM users")
-        .withServiceFactory(serviceFactory)
-        .withTransaction(tx));
-    PCollection<Struct> two = pipeline.apply("read r", SpannerIO.read()
-        .withProjectId("test")
-        .withInstanceId("123")
-        .withDatabaseId("aaa")
-        .withTimestamp(Timestamp.now())
-        .withTable("users")
-        .withColumns("id", "name")
-        .withServiceFactory(serviceFactory)
-        .withTransaction(tx));
-
-    when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class)))
-        .thenReturn(mockTx);
-
-    when(mockTx.executeQuery(Statement.of("SELECT 1"))).thenReturn(ResultSets.forRows(Type.struct(),
-        Collections.<Struct>emptyList()));
-
-    when(mockTx.executeQuery(Statement.of("SELECT * FROM users")))
-        .thenReturn(ResultSets.forRows(fakeType, fakeRows));
-    when(mockTx.read("users", KeySet.all(), Arrays.asList("id", "name")))
-        .thenReturn(ResultSets.forRows(fakeType, fakeRows));
-    when(mockTx.getReadTimestamp()).thenReturn(timestamp);
-
-    PAssert.that(one).containsInAnyOrder(fakeRows);
-    PAssert.that(two).containsInAnyOrder(fakeRows);
-
-    pipeline.run();
-
-    verify(serviceFactory.mockDatabaseClient(), times(2))
-        .readOnlyTransaction(TimestampBound.ofReadTimestamp(timestamp));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java
new file mode 100644
index 0000000..5bdfea5
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
+
+import com.google.api.core.ApiFuture;
+import com.google.cloud.ServiceFactory;
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.DatabaseId;
+import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.Spanner;
+import com.google.cloud.spanner.SpannerOptions;
+import com.google.common.collect.Iterables;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import javax.annotation.concurrent.GuardedBy;
+
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Matchers;
+
+
+/**
+ * Unit tests for {@link SpannerIO}.
+ */
+@RunWith(JUnit4.class)
+public class SpannerIOTest implements Serializable {
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+  private FakeServiceFactory serviceFactory;
+
+  @Before
+  @SuppressWarnings("unchecked")
+  public void setUp() throws Exception {
+    serviceFactory = new FakeServiceFactory();
+  }
+
+  @Test
+  public void emptyTransform() throws Exception {
+    SpannerIO.Write write = SpannerIO.write();
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("requires instance id to be set with");
+    write.validate(null);
+  }
+
+  @Test
+  public void emptyInstanceId() throws Exception {
+    SpannerIO.Write write = SpannerIO.write().withDatabaseId("123");
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("requires instance id to be set with");
+    write.validate(null);
+  }
+
+  @Test
+  public void emptyDatabaseId() throws Exception {
+    SpannerIO.Write write = SpannerIO.write().withInstanceId("123");
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("requires database id to be set with");
+    write.validate(null);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void singleMutationPipeline() throws Exception {
+    Mutation mutation = Mutation.newInsertOrUpdateBuilder("test").set("one").to(2).build();
+    PCollection<Mutation> mutations = pipeline.apply(Create.of(mutation));
+
+    mutations.apply(
+        SpannerIO.write()
+            .withProjectId("test-project")
+            .withInstanceId("test-instance")
+            .withDatabaseId("test-database")
+            .withServiceFactory(serviceFactory));
+    pipeline.run();
+    verify(serviceFactory.mockSpanner())
+        .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database"));
+    verify(serviceFactory.mockDatabaseClient(), times(1))
+        .writeAtLeastOnce(argThat(new IterableOfSize(1)));
+  }
+
+  @Test
+  public void batching() throws Exception {
+    Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build();
+    Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build();
+    SpannerIO.Write write =
+        SpannerIO.write()
+            .withProjectId("test-project")
+            .withInstanceId("test-instance")
+            .withDatabaseId("test-database")
+            .withBatchSizeBytes(1000000000)
+            .withServiceFactory(serviceFactory);
+    SpannerIO.SpannerWriteFn writerFn = new SpannerIO.SpannerWriteFn(write);
+    DoFnTester<Mutation, Void> fnTester = DoFnTester.of(writerFn);
+    fnTester.processBundle(Arrays.asList(one, two));
+
+    verify(serviceFactory.mockSpanner())
+        .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database"));
+    verify(serviceFactory.mockDatabaseClient(), times(1))
+        .writeAtLeastOnce(argThat(new IterableOfSize(2)));
+  }
+
+  @Test
+  public void batchingGroups() throws Exception {
+    Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build();
+    Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build();
+    Mutation three = Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build();
+
+    // Have a room to accumulate one more item.
+    long batchSize = MutationSizeEstimator.sizeOf(one) + 1;
+
+    SpannerIO.Write write =
+        SpannerIO.write()
+            .withProjectId("test-project")
+            .withInstanceId("test-instance")
+            .withDatabaseId("test-database")
+            .withBatchSizeBytes(batchSize)
+            .withServiceFactory(serviceFactory);
+    SpannerIO.SpannerWriteFn writerFn = new SpannerIO.SpannerWriteFn(write);
+    DoFnTester<Mutation, Void> fnTester = DoFnTester.of(writerFn);
+    fnTester.processBundle(Arrays.asList(one, two, three));
+
+    verify(serviceFactory.mockSpanner())
+        .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database"));
+    verify(serviceFactory.mockDatabaseClient(), times(1))
+        .writeAtLeastOnce(argThat(new IterableOfSize(2)));
+    verify(serviceFactory.mockDatabaseClient(), times(1))
+        .writeAtLeastOnce(argThat(new IterableOfSize(1)));
+  }
+
+  @Test
+  public void noBatching() throws Exception {
+    Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build();
+    Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build();
+    SpannerIO.Write write =
+        SpannerIO.write()
+            .withProjectId("test-project")
+            .withInstanceId("test-instance")
+            .withDatabaseId("test-database")
+            .withBatchSizeBytes(0) // turn off batching.
+            .withServiceFactory(serviceFactory);
+    SpannerIO.SpannerWriteFn writerFn = new SpannerIO.SpannerWriteFn(write);
+    DoFnTester<Mutation, Void> fnTester = DoFnTester.of(writerFn);
+    fnTester.processBundle(Arrays.asList(one, two));
+
+    verify(serviceFactory.mockSpanner())
+        .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database"));
+    verify(serviceFactory.mockDatabaseClient(), times(2))
+        .writeAtLeastOnce(argThat(new IterableOfSize(1)));
+  }
+
+  private static class FakeServiceFactory
+      implements ServiceFactory<Spanner, SpannerOptions>, Serializable {
+    // Marked as static so they could be returned by serviceFactory, which is serializable.
+    private static final Object lock = new Object();
+
+    @GuardedBy("lock")
+    private static final List<Spanner> mockSpanners = new ArrayList<>();
+
+    @GuardedBy("lock")
+    private static final List<DatabaseClient> mockDatabaseClients = new ArrayList<>();
+
+    @GuardedBy("lock")
+    private static int count = 0;
+
+    private final int index;
+
+    public FakeServiceFactory() {
+      synchronized (lock) {
+        index = count++;
+        mockSpanners.add(mock(Spanner.class, withSettings().serializable()));
+        mockDatabaseClients.add(mock(DatabaseClient.class, withSettings().serializable()));
+      }
+      ApiFuture voidFuture = mock(ApiFuture.class, withSettings().serializable());
+      when(mockSpanner().getDatabaseClient(Matchers.any(DatabaseId.class)))
+          .thenReturn(mockDatabaseClient());
+      when(mockSpanner().closeAsync()).thenReturn(voidFuture);
+    }
+
+    DatabaseClient mockDatabaseClient() {
+      synchronized (lock) {
+        return mockDatabaseClients.get(index);
+      }
+    }
+
+    Spanner mockSpanner() {
+      synchronized (lock) {
+        return mockSpanners.get(index);
+      }
+    }
+
+    @Override
+    public Spanner create(SpannerOptions serviceOptions) {
+      return mockSpanner();
+    }
+  }
+
+  private static class IterableOfSize extends ArgumentMatcher<Iterable<Mutation>> {
+    private final int size;
+
+    private IterableOfSize(int size) {
+      this.size = size;
+    }
+
+    @Override
+    public boolean matches(Object argument) {
+      return argument instanceof Iterable && Iterables.size((Iterable<?>) argument) == size;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
deleted file mode 100644
index 09cdb8e..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner;
-
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static org.hamcrest.Matchers.hasSize;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.argThat;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import com.google.cloud.spanner.DatabaseId;
-import com.google.cloud.spanner.Mutation;
-import com.google.common.collect.Iterables;
-import java.io.Serializable;
-import java.util.Arrays;
-
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.ArgumentMatcher;
-
-/**
- * Unit tests for {@link SpannerIO}.
- */
-@RunWith(JUnit4.class)
-public class SpannerIOWriteTest implements Serializable {
-  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
-  @Rule public transient ExpectedException thrown = ExpectedException.none();
-
-  private FakeServiceFactory serviceFactory;
-
-  @Before
-  @SuppressWarnings("unchecked")
-  public void setUp() throws Exception {
-    serviceFactory = new FakeServiceFactory();
-  }
-
-  @Test
-  public void emptyTransform() throws Exception {
-    SpannerIO.Write write = SpannerIO.write();
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("requires instance id to be set with");
-    write.validate(null);
-  }
-
-  @Test
-  public void emptyInstanceId() throws Exception {
-    SpannerIO.Write write = SpannerIO.write().withDatabaseId("123");
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("requires instance id to be set with");
-    write.validate(null);
-  }
-
-  @Test
-  public void emptyDatabaseId() throws Exception {
-    SpannerIO.Write write = SpannerIO.write().withInstanceId("123");
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("requires database id to be set with");
-    write.validate(null);
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void singleMutationPipeline() throws Exception {
-    Mutation mutation = Mutation.newInsertOrUpdateBuilder("test").set("one").to(2).build();
-    PCollection<Mutation> mutations = pipeline.apply(Create.of(mutation));
-
-    mutations.apply(
-        SpannerIO.write()
-            .withProjectId("test-project")
-            .withInstanceId("test-instance")
-            .withDatabaseId("test-database")
-            .withServiceFactory(serviceFactory));
-    pipeline.run();
-    verify(serviceFactory.mockSpanner())
-        .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database"));
-    verify(serviceFactory.mockDatabaseClient(), times(1))
-        .writeAtLeastOnce(argThat(new IterableOfSize(1)));
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void singleMutationGroupPipeline() throws Exception {
-    Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build();
-    Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build();
-    Mutation three = Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build();
-    PCollection<MutationGroup> mutations = pipeline
-        .apply(Create.<MutationGroup>of(g(one, two, three)));
-    mutations.apply(
-        SpannerIO.write()
-            .withProjectId("test-project")
-            .withInstanceId("test-instance")
-            .withDatabaseId("test-database")
-            .withServiceFactory(serviceFactory)
-            .grouped());
-    pipeline.run();
-    verify(serviceFactory.mockSpanner())
-        .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database"));
-    verify(serviceFactory.mockDatabaseClient(), times(1))
-        .writeAtLeastOnce(argThat(new IterableOfSize(3)));
-  }
-
-  @Test
-  public void batching() throws Exception {
-    MutationGroup one = g(Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build());
-    MutationGroup two = g(Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build());
-    SpannerIO.Write write =
-        SpannerIO.write()
-            .withProjectId("test-project")
-            .withInstanceId("test-instance")
-            .withDatabaseId("test-database")
-            .withBatchSizeBytes(1000000000)
-            .withServiceFactory(serviceFactory);
-    SpannerWriteGroupFn writerFn = new SpannerWriteGroupFn(write);
-    DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn);
-    fnTester.processBundle(Arrays.asList(one, two));
-
-    verify(serviceFactory.mockSpanner())
-        .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database"));
-    verify(serviceFactory.mockDatabaseClient(), times(1))
-        .writeAtLeastOnce(argThat(new IterableOfSize(2)));
-  }
-
-  @Test
-  public void batchingGroups() throws Exception {
-    MutationGroup one = g(Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build());
-    MutationGroup two = g(Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build());
-    MutationGroup three = g(Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build());
-
-    // Have a room to accumulate one more item.
-    long batchSize = MutationSizeEstimator.sizeOf(one) + 1;
-
-    SpannerIO.Write write =
-        SpannerIO.write()
-            .withProjectId("test-project")
-            .withInstanceId("test-instance")
-            .withDatabaseId("test-database")
-            .withBatchSizeBytes(batchSize)
-            .withServiceFactory(serviceFactory);
-    SpannerWriteGroupFn writerFn = new SpannerWriteGroupFn(write);
-    DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn);
-    fnTester.processBundle(Arrays.asList(one, two, three));
-
-    verify(serviceFactory.mockSpanner())
-        .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database"));
-    verify(serviceFactory.mockDatabaseClient(), times(1))
-        .writeAtLeastOnce(argThat(new IterableOfSize(2)));
-    verify(serviceFactory.mockDatabaseClient(), times(1))
-        .writeAtLeastOnce(argThat(new IterableOfSize(1)));
-  }
-
-  @Test
-  public void noBatching() throws Exception {
-    MutationGroup one = g(Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build());
-    MutationGroup two = g(Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build());
-    SpannerIO.Write write =
-        SpannerIO.write()
-            .withProjectId("test-project")
-            .withInstanceId("test-instance")
-            .withDatabaseId("test-database")
-            .withBatchSizeBytes(0) // turn off batching.
-            .withServiceFactory(serviceFactory);
-    SpannerWriteGroupFn writerFn = new SpannerWriteGroupFn(write);
-    DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn);
-    fnTester.processBundle(Arrays.asList(one, two));
-
-    verify(serviceFactory.mockSpanner())
-        .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database"));
-    verify(serviceFactory.mockDatabaseClient(), times(2))
-        .writeAtLeastOnce(argThat(new IterableOfSize(1)));
-  }
-
-  @Test
-  public void groups() throws Exception {
-    Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build();
-    Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build();
-    Mutation three = Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build();
-
-    // Smallest batch size
-    long batchSize = 1;
-
-    SpannerIO.Write write =
-        SpannerIO.write()
-            .withProjectId("test-project")
-            .withInstanceId("test-instance")
-            .withDatabaseId("test-database")
-            .withBatchSizeBytes(batchSize)
-            .withServiceFactory(serviceFactory);
-    SpannerWriteGroupFn writerFn = new SpannerWriteGroupFn(write);
-    DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn);
-    fnTester.processBundle(Arrays.asList(g(one, two, three)));
-
-    verify(serviceFactory.mockSpanner())
-        .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database"));
-    verify(serviceFactory.mockDatabaseClient(), times(1))
-        .writeAtLeastOnce(argThat(new IterableOfSize(3)));
-  }
-
-  @Test
-  public void displayData() throws Exception {
-    SpannerIO.Write write =
-        SpannerIO.write()
-            .withProjectId("test-project")
-            .withInstanceId("test-instance")
-            .withDatabaseId("test-database")
-            .withBatchSizeBytes(123);
-
-    DisplayData data = DisplayData.from(write);
-    assertThat(data.items(), hasSize(4));
-    assertThat(data, hasDisplayItem("projectId", "test-project"));
-    assertThat(data, hasDisplayItem("instanceId", "test-instance"));
-    assertThat(data, hasDisplayItem("databaseId", "test-database"));
-    assertThat(data, hasDisplayItem("batchSizeBytes", 123));
-  }
-
-  private static class IterableOfSize extends ArgumentMatcher<Iterable<Mutation>> {
-    private final int size;
-
-    private IterableOfSize(int size) {
-      this.size = size;
-    }
-
-    @Override
-    public boolean matches(Object argument) {
-      return argument instanceof Iterable && Iterables.size((Iterable<?>) argument) == size;
-    }
-  }
-
-  private static MutationGroup g(Mutation m, Mutation... other) {
-    return MutationGroup.create(m, other);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java
deleted file mode 100644
index d866975..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner;
-
-import com.google.cloud.spanner.Database;
-import com.google.cloud.spanner.DatabaseAdminClient;
-import com.google.cloud.spanner.DatabaseClient;
-import com.google.cloud.spanner.DatabaseId;
-import com.google.cloud.spanner.Mutation;
-import com.google.cloud.spanner.Operation;
-import com.google.cloud.spanner.Spanner;
-import com.google.cloud.spanner.SpannerOptions;
-import com.google.cloud.spanner.Struct;
-import com.google.cloud.spanner.TimestampBound;
-import com.google.spanner.admin.database.v1.CreateDatabaseMetadata;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** End-to-end test of Cloud Spanner Source. */
-@RunWith(JUnit4.class)
-public class SpannerReadIT {
-
-  private static final int MAX_DB_NAME_LENGTH = 30;
-
-  @Rule public final transient TestPipeline p = TestPipeline.create();
-
-  /** Pipeline options for this test. */
-  public interface SpannerTestPipelineOptions extends TestPipelineOptions {
-    @Description("Instance ID to write to in Spanner")
-    @Default.String("beam-test")
-    String getInstanceId();
-    void setInstanceId(String value);
-
-    @Description("Database ID prefix to write to in Spanner")
-    @Default.String("beam-testdb")
-    String getDatabaseIdPrefix();
-    void setDatabaseIdPrefix(String value);
-
-    @Description("Table name")
-    @Default.String("users")
-    String getTable();
-    void setTable(String value);
-  }
-
-  private Spanner spanner;
-  private DatabaseAdminClient databaseAdminClient;
-  private SpannerTestPipelineOptions options;
-  private String databaseName;
-  private String project;
-
-  @Before
-  public void setUp() throws Exception {
-    PipelineOptionsFactory.register(SpannerTestPipelineOptions.class);
-    options = TestPipeline.testingPipelineOptions().as(SpannerTestPipelineOptions.class);
-
-    project = options.as(GcpOptions.class).getProject();
-
-    spanner = SpannerOptions.newBuilder().setProjectId(project).build().getService();
-
-    databaseName = generateDatabaseName();
-
-    databaseAdminClient = spanner.getDatabaseAdminClient();
-
-    // Delete database if exists.
-    databaseAdminClient.dropDatabase(options.getInstanceId(), databaseName);
-
-    Operation<Database, CreateDatabaseMetadata> op =
-        databaseAdminClient.createDatabase(
-            options.getInstanceId(),
-            databaseName,
-            Collections.singleton(
-                "CREATE TABLE "
-                    + options.getTable()
-                    + " ("
-                    + "  Key           INT64,"
-                    + "  Value         STRING(MAX),"
-                    + ") PRIMARY KEY (Key)"));
-    op.waitFor();
-  }
-
-  @Test
-  public void testRead() throws Exception {
-    DatabaseClient databaseClient =
-        spanner.getDatabaseClient(
-            DatabaseId.of(
-                project, options.getInstanceId(), databaseName));
-
-    List<Mutation> mutations = new ArrayList<>();
-    for (int i = 0; i < 5L; i++) {
-      mutations.add(
-          Mutation.newInsertOrUpdateBuilder(options.getTable())
-              .set("key")
-              .to((long) i)
-              .set("value")
-              .to(RandomUtils.randomAlphaNumeric(100))
-              .build());
-    }
-
-    databaseClient.writeAtLeastOnce(mutations);
-
-    SpannerConfig spannerConfig = SpannerConfig.create()
-        .withProjectId(project)
-        .withInstanceId(options.getInstanceId())
-        .withDatabaseId(databaseName);
-
-    PCollectionView<Transaction> tx =
-        p.apply(
-            SpannerIO.createTransaction()
-                .withSpannerConfig(spannerConfig)
-                .withTimestampBound(TimestampBound.strong()));
-
-    PCollection<Struct> output =
-        p.apply(
-            SpannerIO.read()
-                .withSpannerConfig(spannerConfig)
-                .withQuery("SELECT * FROM " + options.getTable())
-                .withTransaction(tx));
-    PAssert.thatSingleton(output.apply("Count rows", Count.<Struct>globally())).isEqualTo(5L);
-    p.run();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    databaseAdminClient.dropDatabase(options.getInstanceId(), databaseName);
-    spanner.close();
-  }
-
-  private String generateDatabaseName() {
-    String random = RandomUtils
-        .randomAlphaNumeric(MAX_DB_NAME_LENGTH - 1 - options.getDatabaseIdPrefix().length());
-    return options.getDatabaseIdPrefix() + "-" + random;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java
index d208f5c..8df224b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java
@@ -33,7 +33,7 @@ import com.google.cloud.spanner.SpannerOptions;
 import com.google.cloud.spanner.Statement;
 import com.google.spanner.admin.database.v1.CreateDatabaseMetadata;
 import java.util.Collections;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -59,6 +60,11 @@ public class SpannerWriteIT {
 
   /** Pipeline options for this test. */
   public interface SpannerTestPipelineOptions extends TestPipelineOptions {
+    @Description("Project ID for Spanner")
+    @Default.String("apache-beam-testing")
+    String getProjectId();
+    void setProjectId(String value);
+
     @Description("Instance ID to write to in Spanner")
     @Default.String("beam-test")
     String getInstanceId();
@@ -79,16 +85,13 @@ public class SpannerWriteIT {
   private DatabaseAdminClient databaseAdminClient;
   private SpannerTestPipelineOptions options;
   private String databaseName;
-  private String project;
 
   @Before
   public void setUp() throws Exception {
     PipelineOptionsFactory.register(SpannerTestPipelineOptions.class);
     options = TestPipeline.testingPipelineOptions().as(SpannerTestPipelineOptions.class);
 
-    project = options.as(GcpOptions.class).getProject();
-
-    spanner = SpannerOptions.newBuilder().setProjectId(project).build().getService();
+    spanner = SpannerOptions.newBuilder().setProjectId(options.getProjectId()).build().getService();
 
     databaseName = generateDatabaseName();
 
@@ -112,8 +115,9 @@ public class SpannerWriteIT {
   }
 
   private String generateDatabaseName() {
-    String random = RandomUtils
-        .randomAlphaNumeric(MAX_DB_NAME_LENGTH - 1 - options.getDatabaseIdPrefix().length());
+    String random = RandomStringUtils
+        .randomAlphanumeric(MAX_DB_NAME_LENGTH - 1 - options.getDatabaseIdPrefix().length())
+        .toLowerCase();
     return options.getDatabaseIdPrefix() + "-" + random;
   }
 
@@ -123,7 +127,7 @@ public class SpannerWriteIT {
         .apply(ParDo.of(new GenerateMutations(options.getTable())))
         .apply(
             SpannerIO.write()
-                .withProjectId(project)
+                .withProjectId(options.getProjectId())
                 .withInstanceId(options.getInstanceId())
                 .withDatabaseId(databaseName));
 
@@ -131,7 +135,7 @@ public class SpannerWriteIT {
     DatabaseClient databaseClient =
         spanner.getDatabaseClient(
             DatabaseId.of(
-                project, options.getInstanceId(), databaseName));
+                options.getProjectId(), options.getInstanceId(), databaseName));
 
     ResultSet resultSet =
         databaseClient
@@ -145,7 +149,7 @@ public class SpannerWriteIT {
   @After
   public void tearDown() throws Exception {
     databaseAdminClient.dropDatabase(options.getInstanceId(), databaseName);
-    spanner.close();
+    spanner.closeAsync().get();
   }
 
   private static class GenerateMutations extends DoFn<Long, Mutation> {
@@ -161,7 +165,7 @@ public class SpannerWriteIT {
       Mutation.WriteBuilder builder = Mutation.newInsertOrUpdateBuilder(table);
       Long key = c.element();
       builder.set("Key").to(key);
-      builder.set("Value").to(RandomUtils.randomAlphaNumeric(valueSize));
+      builder.set("Value").to(RandomStringUtils.randomAlphabetic(valueSize));
       Mutation mutation = builder.build();
       c.output(mutation);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hadoop-common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/pom.xml b/sdks/java/io/hadoop-common/pom.xml
index 4bcbcd7..8749243 100644
--- a/sdks/java/io/hadoop-common/pom.xml
+++ b/sdks/java/io/hadoop-common/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hadoop-file-system/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/pom.xml b/sdks/java/io/hadoop-file-system/pom.xml
index a9c2e57..db5a1db 100644
--- a/sdks/java/io/hadoop-file-system/pom.xml
+++ b/sdks/java/io/hadoop-file-system/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
@@ -44,6 +44,37 @@
     </plugins>
   </build>
 
+  <properties>
+    <!--
+      This is the version of Hadoop used to compile the hadoop-common module.
+      This dependency is defined with a provided scope.
+      Users must supply their own Hadoop version at runtime.
+    -->
+    <hadoop.version>2.7.3</hadoop.version>
+  </properties>
+
+  <dependencyManagement>
+    <!--
+       We define dependencies here instead of sdks/java/io because
+       of a version mimatch between this Hadoop version and other
+       Hadoop versions declared in other io submodules.
+    -->
+    <dependencies>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-hdfs</artifactId>
+        <classifier>tests</classifier>
+        <version>${hadoop.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-minicluster</artifactId>
+        <version>${hadoop.version}</version>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.beam</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hadoop/input-format/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/pom.xml b/sdks/java/io/hadoop/input-format/pom.xml
index 0953119..06f9f11 100644
--- a/sdks/java/io/hadoop/input-format/pom.xml
+++ b/sdks/java/io/hadoop/input-format/pom.xml
@@ -20,7 +20,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-hadoop-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
   <artifactId>beam-sdks-java-io-hadoop-input-format</artifactId>

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
index 0b4c23f..efd47fd 100644
--- a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
+++ b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
@@ -166,7 +166,7 @@ import org.slf4j.LoggerFactory;
  * }
  * </pre>
  */
-@Experimental(Experimental.Kind.SOURCE_SINK)
+@Experimental
 public class HadoopInputFormatIO {
   private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormatIO.class);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/pom.xml b/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
index 12944f4..9f84e88 100644
--- a/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
+++ b/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
@@ -26,7 +26,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-hadoop-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
   <artifactId>beam-sdks-java-io-hadoop-jdk1.8-tests</artifactId>
@@ -108,11 +108,13 @@
         <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-streaming_2.10</artifactId>
+          <version>${spark.version}</version>
           <scope>runtime</scope>
         </dependency>
         <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-core_2.10</artifactId>
+          <version>${spark.version}</version>
           <scope>runtime</scope>
           <exclusions>
             <exclusion>

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java
index 3f866a4..8745521 100644
--- a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java
+++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.hadoop.inputformat;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
-import java.net.ServerSocket;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -77,7 +76,7 @@ public class HIFIOWithElasticTest implements Serializable {
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = LoggerFactory.getLogger(HIFIOWithElasticTest.class);
   private static final String ELASTIC_IN_MEM_HOSTNAME = "127.0.0.1";
-  private static String elasticInMemPort = "9200";
+  private static final String ELASTIC_IN_MEM_PORT = "9200";
   private static final String ELASTIC_INTERNAL_VERSION = "5.x";
   private static final String TRUE = "true";
   private static final String ELASTIC_INDEX_NAME = "beamdb";
@@ -95,10 +94,6 @@ public class HIFIOWithElasticTest implements Serializable {
   @BeforeClass
   public static void startServer()
       throws NodeValidationException, InterruptedException, IOException {
-    ServerSocket serverSocket = new ServerSocket(0);
-    int port = serverSocket.getLocalPort();
-    serverSocket.close();
-    elasticInMemPort = String.valueOf(port);
     ElasticEmbeddedServer.startElasticEmbeddedServer();
   }
 
@@ -178,7 +173,7 @@ public class HIFIOWithElasticTest implements Serializable {
   public Configuration getConfiguration() {
     Configuration conf = new Configuration();
     conf.set(ConfigurationOptions.ES_NODES, ELASTIC_IN_MEM_HOSTNAME);
-    conf.set(ConfigurationOptions.ES_PORT, String.format("%s", elasticInMemPort));
+    conf.set(ConfigurationOptions.ES_PORT, String.format("%s", ELASTIC_IN_MEM_PORT));
     conf.set(ConfigurationOptions.ES_RESOURCE, ELASTIC_RESOURCE);
     conf.set("es.internal.es.version", ELASTIC_INTERNAL_VERSION);
     conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, TRUE);
@@ -214,7 +209,7 @@ public class HIFIOWithElasticTest implements Serializable {
       Settings settings = Settings.builder()
           .put("node.data", TRUE)
           .put("network.host", ELASTIC_IN_MEM_HOSTNAME)
-          .put("http.port", elasticInMemPort)
+          .put("http.port", ELASTIC_IN_MEM_PORT)
           .put("path.data", elasticTempFolder.getRoot().getPath())
           .put("path.home", elasticTempFolder.getRoot().getPath())
           .put("transport.type", "local")

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/pom.xml b/sdks/java/io/hadoop/pom.xml
index bc3569d..a1c7a2e 100644
--- a/sdks/java/io/hadoop/pom.xml
+++ b/sdks/java/io/hadoop/pom.xml
@@ -20,7 +20,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
   <packaging>pom</packaging>

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hbase/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/pom.xml b/sdks/java/io/hbase/pom.xml
index 40f516a..746b993 100644
--- a/sdks/java/io/hbase/pom.xml
+++ b/sdks/java/io/hbase/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
@@ -31,7 +31,8 @@
   <description>Library to read and write from/to HBase</description>
 
   <properties>
-    <hbase.version>1.2.6</hbase.version>
+    <hbase.version>1.2.5</hbase.version>
+    <hbase.hadoop.version>2.5.1</hbase.hadoop.version>
   </properties>
 
   <build>
@@ -63,12 +64,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.google.auto.service</groupId>
-      <artifactId>auto-service</artifactId>
-      <optional>true</optional>
-    </dependency>
-
-    <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-shaded-client</artifactId>
       <version>${hbase.version}</version>
@@ -108,26 +103,15 @@
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-minicluster</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-hdfs</artifactId>
+      <version>${hbase.hadoop.version}</version>
       <scope>test</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
+      <version>${hbase.hadoop.version}</version>
       <scope>test</scope>
-      <exclusions>
-        <!-- Fix build on JDK-9 -->
-        <exclusion>
-          <groupId>jdk.tools</groupId>
-          <artifactId>jdk.tools</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
deleted file mode 100644
index 2973d1b..0000000
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hbase;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
-import java.util.List;
-import org.apache.beam.sdk.coders.CoderProvider;
-import org.apache.beam.sdk.coders.CoderProviderRegistrar;
-import org.apache.beam.sdk.coders.CoderProviders;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.hadoop.hbase.client.Result;
-
-/**
- * A {@link CoderProviderRegistrar} for standard types used with {@link HBaseIO}.
- */
-@AutoService(CoderProviderRegistrar.class)
-public class HBaseCoderProviderRegistrar implements CoderProviderRegistrar {
-  @Override
-  public List<CoderProvider> getCoderProviders() {
-    return ImmutableList.of(
-      HBaseMutationCoder.getCoderProvider(),
-      CoderProviders.forCoder(TypeDescriptor.of(Result.class), HBaseResultCoder.of()));
-  }
-}