You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/20 01:10:50 UTC

[2/4] incubator-beam git commit: DatastoreIO v1beta3 to v1

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
deleted file mode 100644
index b0c6c18..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
+++ /dev/null
@@ -1,792 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.datastore;
-
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.DEFAULT_BUNDLE_SIZE_BYTES;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.QUERY_BATCH_LIMIT;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.getEstimatedSizeBytes;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.makeRequest;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.isValidKey;
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL;
-import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeDelete;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.hasItem;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DatastoreWriterFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteEntity;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteEntityFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteKey;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteKeyFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.ReadFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.SplitQueryFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.V1Beta3Options;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.UpsertFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.V1Beta3DatastoreFactory;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Write;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.testing.RunnableOnService;
-import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.DoFnTester.CloningBehavior;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.POutput;
-
-import com.google.datastore.v1beta3.CommitRequest;
-import com.google.datastore.v1beta3.Entity;
-import com.google.datastore.v1beta3.EntityResult;
-import com.google.datastore.v1beta3.Key;
-import com.google.datastore.v1beta3.Mutation;
-import com.google.datastore.v1beta3.PartitionId;
-import com.google.datastore.v1beta3.Query;
-import com.google.datastore.v1beta3.QueryResultBatch;
-import com.google.datastore.v1beta3.RunQueryRequest;
-import com.google.datastore.v1beta3.RunQueryResponse;
-import com.google.datastore.v1beta3.client.Datastore;
-import com.google.datastore.v1beta3.client.QuerySplitter;
-import com.google.protobuf.Int32Value;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Tests for {@link V1Beta3}.
- */
-@RunWith(JUnit4.class)
-public class V1Beta3Test {
-  private static final String PROJECT_ID = "testProject";
-  private static final String NAMESPACE = "testNamespace";
-  private static final String KIND = "testKind";
-  private static final Query QUERY;
-  private static final V1Beta3Options v1Beta3Options;
-  static {
-    Query.Builder q = Query.newBuilder();
-    q.addKindBuilder().setName(KIND);
-    QUERY = q.build();
-    v1Beta3Options = V1Beta3Options.from(PROJECT_ID, QUERY, NAMESPACE);
-  }
-  private V1Beta3.Read initialRead;
-
-  @Mock
-  Datastore mockDatastore;
-  @Mock
-  QuerySplitter mockQuerySplitter;
-  @Mock
-  V1Beta3DatastoreFactory mockDatastoreFactory;
-
-  @Rule
-  public final ExpectedException thrown = ExpectedException.none();
-
-  @Before
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-
-    initialRead = DatastoreIO.v1beta3().read()
-        .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
-
-    when(mockDatastoreFactory.getDatastore(any(PipelineOptions.class), any(String.class)))
-        .thenReturn(mockDatastore);
-    when(mockDatastoreFactory.getQuerySplitter())
-        .thenReturn(mockQuerySplitter);
-  }
-
-  @Test
-  public void testBuildRead() throws Exception {
-    V1Beta3.Read read = DatastoreIO.v1beta3().read()
-        .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
-    assertEquals(QUERY, read.getQuery());
-    assertEquals(PROJECT_ID, read.getProjectId());
-    assertEquals(NAMESPACE, read.getNamespace());
-  }
-
-  /**
-   * {@link #testBuildRead} but constructed in a different order.
-   */
-  @Test
-  public void testBuildReadAlt() throws Exception {
-    V1Beta3.Read read =  DatastoreIO.v1beta3().read()
-        .withProjectId(PROJECT_ID).withNamespace(NAMESPACE).withQuery(QUERY);
-    assertEquals(QUERY, read.getQuery());
-    assertEquals(PROJECT_ID, read.getProjectId());
-    assertEquals(NAMESPACE, read.getNamespace());
-  }
-
-  @Test
-  public void testReadValidationFailsProject() throws Exception {
-    V1Beta3.Read read =  DatastoreIO.v1beta3().read().withQuery(QUERY);
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("project");
-    read.validate(null);
-  }
-
-  @Test
-  public void testReadValidationFailsQuery() throws Exception {
-    V1Beta3.Read read =  DatastoreIO.v1beta3().read().withProjectId(PROJECT_ID);
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("query");
-    read.validate(null);
-  }
-
-  @Test
-  public void testReadValidationFailsQueryLimitZero() throws Exception {
-    Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build();
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Invalid query limit 0: must be positive");
-
-    DatastoreIO.v1beta3().read().withQuery(invalidLimit);
-  }
-
-  @Test
-  public void testReadValidationFailsQueryLimitNegative() throws Exception {
-    Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build();
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Invalid query limit -5: must be positive");
-
-    DatastoreIO.v1beta3().read().withQuery(invalidLimit);
-  }
-
-  @Test
-  public void testReadValidationSucceedsNamespace() throws Exception {
-    V1Beta3.Read read =  DatastoreIO.v1beta3().read().withProjectId(PROJECT_ID).withQuery(QUERY);
-    /* Should succeed, as a null namespace is fine. */
-    read.validate(null);
-  }
-
-  @Test
-  public void testReadDisplayData() {
-    V1Beta3.Read read =  DatastoreIO.v1beta3().read()
-      .withProjectId(PROJECT_ID)
-      .withQuery(QUERY)
-      .withNamespace(NAMESPACE);
-
-    DisplayData displayData = DisplayData.from(read);
-
-    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
-    assertThat(displayData, hasDisplayItem("query", QUERY.toString()));
-    assertThat(displayData, hasDisplayItem("namespace", NAMESPACE));
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testSourcePrimitiveDisplayData() {
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    PTransform<PBegin, ? extends POutput> read = DatastoreIO.v1beta3().read().withProjectId(
-        "myProject").withQuery(Query.newBuilder().build());
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
-    assertThat("DatastoreIO read should include the project in its primitive display data",
-        displayData, hasItem(hasDisplayItem("projectId")));
-  }
-
-  @Test
-  public void testWriteDoesNotAllowNullProject() throws Exception {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("projectId");
-
-    DatastoreIO.v1beta3().write().withProjectId(null);
-  }
-
-  @Test
-  public void testWriteValidationFailsWithNoProject() throws Exception {
-    Write write =  DatastoreIO.v1beta3().write();
-
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("projectId");
-
-    write.validate(null);
-  }
-
-  @Test
-  public void testWriteValidationSucceedsWithProject() throws Exception {
-    Write write =  DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
-    write.validate(null);
-  }
-
-  @Test
-  public void testWriteDisplayData() {
-    Write write =  DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
-
-    DisplayData displayData = DisplayData.from(write);
-
-    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
-  }
-
-  @Test
-  public void testDeleteEntityDoesNotAllowNullProject() throws Exception {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("projectId");
-
-    DatastoreIO.v1beta3().deleteEntity().withProjectId(null);
-  }
-
-  @Test
-  public void testDeleteEntityValidationFailsWithNoProject() throws Exception {
-    DeleteEntity deleteEntity = DatastoreIO.v1beta3().deleteEntity();
-
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("projectId");
-
-    deleteEntity.validate(null);
-  }
-
-  @Test
-  public void testDeleteEntityValidationSucceedsWithProject() throws Exception {
-    DeleteEntity deleteEntity = DatastoreIO.v1beta3().deleteEntity().withProjectId(PROJECT_ID);
-    deleteEntity.validate(null);
-  }
-
-  @Test
-  public void testDeleteEntityDisplayData() {
-    DeleteEntity deleteEntity =  DatastoreIO.v1beta3().deleteEntity().withProjectId(PROJECT_ID);
-
-    DisplayData displayData = DisplayData.from(deleteEntity);
-
-    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
-  }
-
-  @Test
-  public void testDeleteKeyDoesNotAllowNullProject() throws Exception {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("projectId");
-
-    DatastoreIO.v1beta3().deleteKey().withProjectId(null);
-  }
-
-  @Test
-  public void testDeleteKeyValidationFailsWithNoProject() throws Exception {
-    DeleteKey deleteKey = DatastoreIO.v1beta3().deleteKey();
-
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("projectId");
-
-    deleteKey.validate(null);
-  }
-
-  @Test
-  public void testDeleteKeyValidationSucceedsWithProject() throws Exception {
-    DeleteKey deleteKey = DatastoreIO.v1beta3().deleteKey().withProjectId(PROJECT_ID);
-    deleteKey.validate(null);
-  }
-
-  @Test
-  public void testDeleteKeyDisplayData() {
-    DeleteKey deleteKey =  DatastoreIO.v1beta3().deleteKey().withProjectId(PROJECT_ID);
-
-    DisplayData displayData = DisplayData.from(deleteKey);
-
-    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testWritePrimitiveDisplayData() {
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    PTransform<PCollection<Entity>, ?> write =
-        DatastoreIO.v1beta3().write().withProjectId("myProject");
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
-    assertThat("DatastoreIO write should include the project in its primitive display data",
-        displayData, hasItem(hasDisplayItem("projectId")));
-    assertThat("DatastoreIO write should include the upsertFn in its primitive display data",
-        displayData, hasItem(hasDisplayItem("upsertFn")));
-
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testDeleteEntityPrimitiveDisplayData() {
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    PTransform<PCollection<Entity>, ?> write =
-        DatastoreIO.v1beta3().deleteEntity().withProjectId("myProject");
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
-    assertThat("DatastoreIO write should include the project in its primitive display data",
-        displayData, hasItem(hasDisplayItem("projectId")));
-    assertThat("DatastoreIO write should include the deleteEntityFn in its primitive display data",
-        displayData, hasItem(hasDisplayItem("deleteEntityFn")));
-
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testDeleteKeyPrimitiveDisplayData() {
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    PTransform<PCollection<Key>, ?> write =
-        DatastoreIO.v1beta3().deleteKey().withProjectId("myProject");
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
-    assertThat("DatastoreIO write should include the project in its primitive display data",
-        displayData, hasItem(hasDisplayItem("projectId")));
-    assertThat("DatastoreIO write should include the deleteKeyFn in its primitive display data",
-        displayData, hasItem(hasDisplayItem("deleteKeyFn")));
-
-  }
-
-  /**
-   * Test building a Write using builder methods.
-   */
-  @Test
-  public void testBuildWrite() throws Exception {
-    V1Beta3.Write write =  DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
-    assertEquals(PROJECT_ID, write.getProjectId());
-  }
-
-  /**
-   * Test the detection of complete and incomplete keys.
-   */
-  @Test
-  public void testHasNameOrId() {
-    Key key;
-    // Complete with name, no ancestor
-    key = makeKey("bird", "finch").build();
-    assertTrue(isValidKey(key));
-
-    // Complete with id, no ancestor
-    key = makeKey("bird", 123).build();
-    assertTrue(isValidKey(key));
-
-    // Incomplete, no ancestor
-    key = makeKey("bird").build();
-    assertFalse(isValidKey(key));
-
-    // Complete with name and ancestor
-    key = makeKey("bird", "owl").build();
-    key = makeKey(key, "bird", "horned").build();
-    assertTrue(isValidKey(key));
-
-    // Complete with id and ancestor
-    key = makeKey("bird", "owl").build();
-    key = makeKey(key, "bird", 123).build();
-    assertTrue(isValidKey(key));
-
-    // Incomplete with ancestor
-    key = makeKey("bird", "owl").build();
-    key = makeKey(key, "bird").build();
-    assertFalse(isValidKey(key));
-
-    key = makeKey().build();
-    assertFalse(isValidKey(key));
-  }
-
-  /**
-   * Test that entities with incomplete keys cannot be updated.
-   */
-  @Test
-  public void testAddEntitiesWithIncompleteKeys() throws Exception {
-    Key key = makeKey("bird").build();
-    Entity entity = Entity.newBuilder().setKey(key).build();
-    UpsertFn upsertFn = new UpsertFn();
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Entities to be written to the Datastore must have complete keys");
-
-    upsertFn.apply(entity);
-  }
-
-  @Test
-  /**
-   * Test that entities with valid keys are transformed to upsert mutations.
-   */
-  public void testAddEntities() throws Exception {
-    Key key = makeKey("bird", "finch").build();
-    Entity entity = Entity.newBuilder().setKey(key).build();
-    UpsertFn upsertFn = new UpsertFn();
-
-    Mutation exceptedMutation = makeUpsert(entity).build();
-    assertEquals(upsertFn.apply(entity), exceptedMutation);
-  }
-
-  /**
-   * Test that entities with incomplete keys cannot be deleted.
-   */
-  @Test
-  public void testDeleteEntitiesWithIncompleteKeys() throws Exception {
-    Key key = makeKey("bird").build();
-    Entity entity = Entity.newBuilder().setKey(key).build();
-    DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Entities to be deleted from the Datastore must have complete keys");
-
-    deleteEntityFn.apply(entity);
-  }
-
-  /**
-   * Test that entities with valid keys are transformed to delete mutations.
-   */
-  @Test
-  public void testDeleteEntities() throws Exception {
-    Key key = makeKey("bird", "finch").build();
-    Entity entity = Entity.newBuilder().setKey(key).build();
-    DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
-
-    Mutation exceptedMutation = makeDelete(entity.getKey()).build();
-    assertEquals(deleteEntityFn.apply(entity), exceptedMutation);
-  }
-
-  /**
-   * Test that incomplete keys cannot be deleted.
-   */
-  @Test
-  public void testDeleteIncompleteKeys() throws Exception {
-    Key key = makeKey("bird").build();
-    DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Keys to be deleted from the Datastore must be complete");
-
-    deleteKeyFn.apply(key);
-  }
-
-  /**
-   * Test that valid keys are transformed to delete mutations.
-   */
-  @Test
-  public void testDeleteKeys() throws Exception {
-    Key key = makeKey("bird", "finch").build();
-    DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
-
-    Mutation exceptedMutation = makeDelete(key).build();
-    assertEquals(deleteKeyFn.apply(key), exceptedMutation);
-  }
-
-  @Test
-  public void testDatastoreWriteFnDisplayData() {
-    DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID);
-    DisplayData displayData = DisplayData.from(datastoreWriter);
-    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
-  }
-
-  /** Tests {@link DatastoreWriterFn} with entities less than one batch. */
-  @Test
-  public void testDatatoreWriterFnWithOneBatch() throws Exception {
-    datastoreWriterFnTest(100);
-  }
-
-  /** Tests {@link DatastoreWriterFn} with entities of more than one batches, but not a multiple. */
-  @Test
-  public void testDatatoreWriterFnWithMultipleBatches() throws Exception {
-    datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 3 + 100);
-  }
-
-  /**
-   * Tests {@link DatastoreWriterFn} with entities of several batches, using an exact multiple of
-   * write batch size.
-   */
-  @Test
-  public void testDatatoreWriterFnWithBatchesExactMultiple() throws Exception {
-    datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 2);
-  }
-
-  // A helper method to test DatastoreWriterFn for various batch sizes.
-  private void datastoreWriterFnTest(int numMutations) throws Exception {
-    // Create the requested number of mutations.
-    List<Mutation> mutations = new ArrayList<>(numMutations);
-    for (int i = 0; i < numMutations; ++i) {
-      mutations.add(
-          makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build());
-    }
-
-    DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, mockDatastoreFactory);
-    DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
-    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
-    doFnTester.processBundle(mutations);
-
-    int start = 0;
-    while (start < numMutations) {
-      int end = Math.min(numMutations, start + DATASTORE_BATCH_UPDATE_LIMIT);
-      CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
-      commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
-        commitRequest.addAllMutations(mutations.subList(start, end));
-      // Verify all the batch requests were made with the expected mutations.
-      verify(mockDatastore, times(1)).commit(commitRequest.build());
-      start = end;
-    }
-  }
-
-  /**
-   * Tests {@link V1Beta3.Read#getEstimatedSizeBytes} to fetch and return estimated size for a
-   * query.
-   */
-  @Test
-  public void testEstimatedSizeBytes() throws Exception {
-    long entityBytes = 100L;
-    // Per Kind statistics request and response
-    RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), NAMESPACE);
-    RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
-
-    when(mockDatastore.runQuery(statRequest))
-        .thenReturn(statResponse);
-
-    assertEquals(entityBytes, getEstimatedSizeBytes(mockDatastore, QUERY, NAMESPACE));
-    verify(mockDatastore, times(1)).runQuery(statRequest);
-  }
-
-  /**
-   * Tests {@link SplitQueryFn} when number of query splits is specified.
-   */
-  @Test
-  public void testSplitQueryFnWithNumSplits() throws Exception {
-    int numSplits = 100;
-    when(mockQuerySplitter.getSplits(
-        eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class)))
-        .thenReturn(splitQuery(QUERY, numSplits));
-
-    SplitQueryFn splitQueryFn = new SplitQueryFn(v1Beta3Options, numSplits, mockDatastoreFactory);
-    DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn);
-    /**
-     * Although Datastore client is marked transient in {@link SplitQueryFn}, when injected through
-     * mock factory using a when clause for unit testing purposes, it is not serializable
-     * because it doesn't have a no-arg constructor. Thus disabling the cloning to prevent the
-     * doFn from being serialized.
-     */
-    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
-    List<KV<Integer, Query>> queries = doFnTester.processBundle(QUERY);
-
-    assertEquals(queries.size(), numSplits);
-    verifyUniqueKeys(queries);
-    verify(mockQuerySplitter, times(1)).getSplits(
-        eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class));
-    verifyZeroInteractions(mockDatastore);
-  }
-
-  /**
-   * Tests {@link SplitQueryFn} when no query splits is specified.
-   */
-  @Test
-  public void testSplitQueryFnWithoutNumSplits() throws Exception {
-    // Force SplitQueryFn to compute the number of query splits
-    int numSplits = 0;
-    int expectedNumSplits = 20;
-    long entityBytes = expectedNumSplits * DEFAULT_BUNDLE_SIZE_BYTES;
-
-    // Per Kind statistics request and response
-    RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), NAMESPACE);
-    RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
-
-    when(mockDatastore.runQuery(statRequest))
-        .thenReturn(statResponse);
-    when(mockQuerySplitter.getSplits(
-        eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class)))
-        .thenReturn(splitQuery(QUERY, expectedNumSplits));
-
-    SplitQueryFn splitQueryFn = new SplitQueryFn(v1Beta3Options, numSplits, mockDatastoreFactory);
-    DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn);
-    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
-    List<KV<Integer, Query>> queries = doFnTester.processBundle(QUERY);
-
-    assertEquals(queries.size(), expectedNumSplits);
-    verifyUniqueKeys(queries);
-    verify(mockQuerySplitter, times(1)).getSplits(
-        eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class));
-    verify(mockDatastore, times(1)).runQuery(statRequest);
-  }
-
-  /**
-   * Tests {@link V1Beta3.Read.SplitQueryFn} when the query has a user specified limit.
-   */
-  @Test
-  public void testSplitQueryFnWithQueryLimit() throws Exception {
-    Query queryWithLimit = QUERY.toBuilder().clone()
-        .setLimit(Int32Value.newBuilder().setValue(1))
-        .build();
-
-    SplitQueryFn splitQueryFn = new SplitQueryFn(v1Beta3Options, 10, mockDatastoreFactory);
-    DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn);
-    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
-    List<KV<Integer, Query>> queries = doFnTester.processBundle(queryWithLimit);
-
-    assertEquals(queries.size(), 1);
-    verifyUniqueKeys(queries);
-    verifyNoMoreInteractions(mockDatastore);
-    verifyNoMoreInteractions(mockQuerySplitter);
-  }
-
-  /** Tests {@link ReadFn} with a query limit less than one batch. */
-  @Test
-  public void testReadFnWithOneBatch() throws Exception {
-    readFnTest(5);
-  }
-
-  /** Tests {@link ReadFn} with a query limit more than one batch, and not a multiple. */
-  @Test
-  public void testReadFnWithMultipleBatches() throws Exception {
-    readFnTest(QUERY_BATCH_LIMIT + 5);
-  }
-
-  /** Tests {@link ReadFn} for several batches, using an exact multiple of batch size results. */
-  @Test
-  public void testReadFnWithBatchesExactMultiple() throws Exception {
-    readFnTest(5 * QUERY_BATCH_LIMIT);
-  }
-
-  /** Helper Methods */
-
-  /** A helper function that verifies if all the queries have unique keys. */
-  private void verifyUniqueKeys(List<KV<Integer, Query>> queries) {
-    Set<Integer> keys = new HashSet<>();
-    for (KV<Integer, Query> kv: queries) {
-      keys.add(kv.getKey());
-    }
-    assertEquals(keys.size(), queries.size());
-  }
-
-  /**
-   * A helper function that creates mock {@link Entity} results in response to a query. Always
-   * indicates that more results are available, unless the batch is limited to fewer than
-   * {@link V1Beta3.Read#QUERY_BATCH_LIMIT} results.
-   */
-  private static RunQueryResponse mockResponseForQuery(Query q) {
-    // Every query V1Beta3 sends should have a limit.
-    assertTrue(q.hasLimit());
-
-    // The limit should be in the range [1, QUERY_BATCH_LIMIT]
-    int limit = q.getLimit().getValue();
-    assertThat(limit, greaterThanOrEqualTo(1));
-    assertThat(limit, lessThanOrEqualTo(QUERY_BATCH_LIMIT));
-
-    // Create the requested number of entities.
-    List<EntityResult> entities = new ArrayList<>(limit);
-    for (int i = 0; i < limit; ++i) {
-      entities.add(
-          EntityResult.newBuilder()
-              .setEntity(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)))
-              .build());
-    }
-
-    // Fill out the other parameters on the returned result batch.
-    RunQueryResponse.Builder ret = RunQueryResponse.newBuilder();
-    ret.getBatchBuilder()
-        .addAllEntityResults(entities)
-        .setEntityResultType(EntityResult.ResultType.FULL)
-        .setMoreResults(
-            limit == QUERY_BATCH_LIMIT
-                ? QueryResultBatch.MoreResultsType.NOT_FINISHED
-                : QueryResultBatch.MoreResultsType.NO_MORE_RESULTS);
-
-    return ret.build();
-  }
-
-  /** Helper function to run a test reading from a {@link ReadFn}. */
-  private void readFnTest(int numEntities) throws Exception {
-    // An empty query to read entities.
-    Query query = Query.newBuilder().setLimit(
-        Int32Value.newBuilder().setValue(numEntities)).build();
-
-    // Use mockResponseForQuery to generate results.
-    when(mockDatastore.runQuery(any(RunQueryRequest.class)))
-        .thenAnswer(new Answer<RunQueryResponse>() {
-          @Override
-          public RunQueryResponse answer(InvocationOnMock invocationOnMock) throws Throwable {
-            Query q = ((RunQueryRequest) invocationOnMock.getArguments()[0]).getQuery();
-            return mockResponseForQuery(q);
-          }
-        });
-
-    ReadFn readFn = new ReadFn(v1Beta3Options, mockDatastoreFactory);
-    DoFnTester<Query, Entity> doFnTester = DoFnTester.of(readFn);
-    /**
-     * Although Datastore client is marked transient in {@link ReadFn}, when injected through
-     * mock factory using a when clause for unit testing purposes, it is not serializable
-     * because it doesn't have a no-arg constructor. Thus disabling the cloning to prevent the
-     * test object from being serialized.
-     */
-    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
-    List<Entity> entities = doFnTester.processBundle(query);
-
-    int expectedNumCallsToRunQuery = (int) Math.ceil((double) numEntities / QUERY_BATCH_LIMIT);
-    verify(mockDatastore, times(expectedNumCallsToRunQuery)).runQuery(any(RunQueryRequest.class));
-    // Validate the number of results.
-    assertEquals(numEntities, entities.size());
-  }
-
-  /** Builds a per-kind statistics response with the given entity size. */
-  private static RunQueryResponse makeStatKindResponse(long entitySizeInBytes) {
-    RunQueryResponse.Builder timestampResponse = RunQueryResponse.newBuilder();
-    Entity.Builder entity = Entity.newBuilder();
-    entity.setKey(makeKey("dummyKind", "dummyId"));
-    entity.getMutableProperties().put("entity_bytes", makeValue(entitySizeInBytes).build());
-    EntityResult.Builder entityResult = EntityResult.newBuilder();
-    entityResult.setEntity(entity);
-    QueryResultBatch.Builder batch = QueryResultBatch.newBuilder();
-    batch.addEntityResults(entityResult);
-    timestampResponse.setBatch(batch);
-    return timestampResponse.build();
-  }
-
-  /** Builds a per-kind statistics query for the given timestamp and namespace. */
-  private static Query makeStatKindQuery(String namespace) {
-    Query.Builder statQuery = Query.newBuilder();
-    if (namespace == null) {
-      statQuery.addKindBuilder().setName("__Stat_Kind__");
-    } else {
-      statQuery.addKindBuilder().setName("__Ns_Stat_Kind__");
-    }
-    statQuery.setFilter(makeFilter("kind_name", EQUAL, makeValue(KIND)).build());
-    statQuery.addOrder(makeOrder("timestamp", DESCENDING));
-    statQuery.setLimit(Int32Value.newBuilder().setValue(1));
-    return statQuery.build();
-  }
-
-  /** Generate dummy query splits. */
-  private List<Query> splitQuery(Query query, int numSplits) {
-    List<Query> queries = new LinkedList<>();
-    for (int i = 0; i < numSplits; i++) {
-      queries.add(query.toBuilder().clone().build());
-    }
-    return queries;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestOptions.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestOptions.java
deleted file mode 100644
index 099ebe0..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestOptions.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.datastore;
-
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-
-import javax.annotation.Nullable;
-
-/**
- * V1Beta3 Datastore related pipeline options.
- */
-public interface V1Beta3TestOptions extends TestPipelineOptions {
-  @Description("Project ID to read from datastore")
-  @Default.String("apache-beam-testing")
-  String getProject();
-  void setProject(String value);
-
-  @Description("Datastore Entity kind")
-  @Default.String("beam-test")
-  String getKind();
-  void setKind(String value);
-
-  @Description("Datastore Namespace")
-  String getNamespace();
-  void setNamespace(@Nullable String value);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
deleted file mode 100644
index 7eaf23e..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.datastore;
-
-import static com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NOT_FINISHED;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeDelete;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
-
-import org.apache.beam.sdk.options.GcpOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
-import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.datastore.v1beta3.CommitRequest;
-import com.google.datastore.v1beta3.Entity;
-import com.google.datastore.v1beta3.EntityResult;
-import com.google.datastore.v1beta3.Key;
-import com.google.datastore.v1beta3.Key.PathElement;
-import com.google.datastore.v1beta3.Mutation;
-import com.google.datastore.v1beta3.PropertyFilter;
-import com.google.datastore.v1beta3.Query;
-import com.google.datastore.v1beta3.QueryResultBatch;
-import com.google.datastore.v1beta3.RunQueryRequest;
-import com.google.datastore.v1beta3.RunQueryResponse;
-import com.google.datastore.v1beta3.client.Datastore;
-import com.google.datastore.v1beta3.client.DatastoreException;
-import com.google.datastore.v1beta3.client.DatastoreFactory;
-import com.google.datastore.v1beta3.client.DatastoreOptions;
-import com.google.protobuf.Int32Value;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
-import javax.annotation.Nullable;
-
-class V1Beta3TestUtil {
-  private static final Logger LOG = LoggerFactory.getLogger(V1Beta3TestUtil.class);
-
-  /**
-   * A helper function to create the ancestor key for all created and queried entities.
-   */
-  static Key makeAncestorKey(@Nullable String namespace, String kind, String ancestor) {
-    Key.Builder keyBuilder = makeKey(kind, ancestor);
-    if (namespace != null) {
-      keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
-    }
-    return keyBuilder.build();
-  }
-
-  /**
-   * Build a datastore ancestor query for the specified kind, namespace and ancestor.
-   */
-  static Query makeAncestorKindQuery(String kind, @Nullable String namespace, String ancestor) {
-    Query.Builder q = Query.newBuilder();
-    q.addKindBuilder().setName(kind);
-    q.setFilter(makeFilter(
-        "__key__",
-        PropertyFilter.Operator.HAS_ANCESTOR,
-        makeValue(makeAncestorKey(namespace, kind, ancestor))));
-    return q.build();
-  }
-
-  /**
-   * Build an entity for the given ancestorKey, kind, namespace and value.
-   */
-  static Entity makeEntity(Long value, Key ancestorKey, String kind, @Nullable String namespace) {
-    Entity.Builder entityBuilder = Entity.newBuilder();
-    Key.Builder keyBuilder = makeKey(ancestorKey, kind, UUID.randomUUID().toString());
-    // NOTE: Namespace is not inherited between keys created with DatastoreHelper.makeKey, so
-    // we must set the namespace on keyBuilder. TODO: Once partitionId inheritance is added,
-    // we can simplify this code.
-    if (namespace != null) {
-      keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
-    }
-
-    entityBuilder.setKey(keyBuilder.build());
-    entityBuilder.getMutableProperties().put("value", makeValue(value).build());
-    return entityBuilder.build();
-  }
-
-  /**
-   * A DoFn that creates entity for a long number.
-   */
-  static class CreateEntityFn extends DoFn<Long, Entity> {
-    private final String kind;
-    @Nullable
-    private final String namespace;
-    private Key ancestorKey;
-
-    CreateEntityFn(String kind, @Nullable String namespace, String ancestor) {
-      this.kind = kind;
-      this.namespace = namespace;
-      // Build the ancestor key for all created entities once, including the namespace.
-      ancestorKey = makeAncestorKey(namespace, kind, ancestor);
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
-      c.output(makeEntity(c.element(), ancestorKey, kind, namespace));
-    }
-  }
-
-  /**
-   * Build a new datastore client.
-   */
-  static Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) {
-    DatastoreOptions.Builder builder =
-        new DatastoreOptions.Builder()
-            .projectId(projectId)
-            .initializer(
-                new RetryHttpRequestInitializer()
-            );
-
-    Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
-    if (credential != null) {
-      builder.credential(credential);
-    }
-    return DatastoreFactory.get().create(builder.build());
-  }
-
-  /**
-   * Build a datastore query request.
-   */
-  private static RunQueryRequest makeRequest(Query query, @Nullable String namespace) {
-    RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query);
-    if (namespace != null) {
-      requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
-    }
-    return requestBuilder.build();
-  }
-
-  /**
-   * Delete all entities with the given ancestor.
-   */
-  static void deleteAllEntities(V1Beta3TestOptions options, String ancestor) throws Exception {
-    Datastore datastore = getDatastore(options, options.getProject());
-    Query query = V1Beta3TestUtil.makeAncestorKindQuery(
-        options.getKind(), options.getNamespace(), ancestor);
-
-    V1Beta3TestReader reader = new V1Beta3TestReader(datastore, query, options.getNamespace());
-    V1Beta3TestWriter writer = new V1Beta3TestWriter(datastore, new DeleteMutationBuilder());
-
-    long numEntities = 0;
-    while (reader.advance()) {
-      Entity entity = reader.getCurrent();
-      numEntities++;
-      writer.write(entity);
-    }
-
-    writer.close();
-    LOG.info("Successfully deleted {} entities", numEntities);
-  }
-
-  /**
-   * Returns the total number of entities for the given datastore.
-   */
-  static long countEntities(V1Beta3TestOptions options, String ancestor) throws Exception {
-    // Read from datastore.
-    Datastore datastore = V1Beta3TestUtil.getDatastore(options, options.getProject());
-    Query query = V1Beta3TestUtil.makeAncestorKindQuery(
-        options.getKind(), options.getNamespace(), ancestor);
-
-    V1Beta3TestReader reader = new V1Beta3TestReader(datastore, query, options.getNamespace());
-
-    long numEntitiesRead = 0;
-    while (reader.advance()) {
-      reader.getCurrent();
-      numEntitiesRead++;
-    }
-    return numEntitiesRead;
-  }
-
-  /**
-   * An interface to represent any datastore mutation operation.
-   * Mutation operations include insert, delete, upsert, update.
-   */
-  interface MutationBuilder {
-    Mutation.Builder apply(Entity entity);
-  }
-
-  /**
-   *A MutationBuilder that performs upsert operation.
-   */
-  static class UpsertMutationBuilder implements MutationBuilder {
-    public Mutation.Builder apply(Entity entity) {
-      return makeUpsert(entity);
-    }
-  }
-
-  /**
-   * A MutationBuilder that performs delete operation.
-   */
-  static class DeleteMutationBuilder implements MutationBuilder {
-    public Mutation.Builder apply(Entity entity) {
-      return makeDelete(entity.getKey());
-    }
-  }
-
-  /**
-   * A helper class to write entities to datastore.
-   */
-  static class V1Beta3TestWriter {
-    private static final Logger LOG = LoggerFactory.getLogger(V1Beta3TestWriter.class);
-    // Limits the number of entities updated per batch
-    private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
-    // Number of times to retry on update failure
-    private static final int MAX_RETRIES = 5;
-    //Initial backoff time for exponential backoff for retry attempts.
-    private static final int INITIAL_BACKOFF_MILLIS = 5000;
-
-    // Returns true if a Datastore key is complete. A key is complete if its last element
-    // has either an id or a name.
-    static boolean isValidKey(Key key) {
-      List<PathElement> elementList = key.getPathList();
-      if (elementList.isEmpty()) {
-        return false;
-      }
-      PathElement lastElement = elementList.get(elementList.size() - 1);
-      return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
-    }
-
-    private final Datastore datastore;
-    private final MutationBuilder mutationBuilder;
-    private final List<Entity> entities = new ArrayList<>();
-
-    V1Beta3TestWriter(Datastore datastore, MutationBuilder mutationBuilder) {
-      this.datastore = datastore;
-      this.mutationBuilder = mutationBuilder;
-    }
-
-    void write(Entity value) throws Exception {
-      // Verify that the entity to write has a complete key.
-      if (!isValidKey(value.getKey())) {
-        throw new IllegalArgumentException(
-            "Entities to be written to the Datastore must have complete keys");
-      }
-
-      entities.add(value);
-
-      if (entities.size() >= DATASTORE_BATCH_UPDATE_LIMIT) {
-        flushBatch();
-      }
-    }
-
-    void close() throws Exception {
-      // flush any remaining entities
-      if (entities.size() > 0) {
-        flushBatch();
-      }
-    }
-
-    // commit the list of entities to datastore
-    private void flushBatch() throws DatastoreException, IOException, InterruptedException {
-      LOG.info("Writing batch of {} entities", entities.size());
-      Sleeper sleeper = Sleeper.DEFAULT;
-      BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
-
-      while (true) {
-        // Batch mutate entities.
-        try {
-          CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
-          for (Entity entity: entities) {
-            commitRequest.addMutations(mutationBuilder.apply(entity));
-          }
-          commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
-          datastore.commit(commitRequest.build());
-          // Break if the commit threw no exception.
-          break;
-        } catch (DatastoreException exception) {
-          LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
-              exception.getMessage());
-          if (!BackOffUtils.next(sleeper, backoff)) {
-            LOG.error("Aborting after {} retries.", MAX_RETRIES);
-            throw exception;
-          }
-        }
-      }
-      LOG.info("Successfully wrote {} entities", entities.size());
-      entities.clear();
-    }
-  }
-
-  /**
-   * A helper class to read entities from datastore.
-   */
-  static class V1Beta3TestReader {
-    private static final int QUERY_BATCH_LIMIT = 500;
-    private final Datastore datastore;
-    private final Query query;
-    @Nullable
-    private final String namespace;
-    private boolean moreResults;
-    private java.util.Iterator<EntityResult> entities;
-    // Current batch of query results
-    private QueryResultBatch currentBatch;
-    private Entity currentEntity;
-
-    V1Beta3TestReader(Datastore datastore, Query query, @Nullable String namespace) {
-      this.datastore = datastore;
-      this.query = query;
-      this.namespace = namespace;
-    }
-
-    Entity getCurrent() {
-      return currentEntity;
-    }
-
-    boolean advance() throws IOException {
-      if (entities == null || (!entities.hasNext() && moreResults)) {
-        try {
-          entities = getIteratorAndMoveCursor();
-        } catch (DatastoreException e) {
-          throw new IOException(e);
-        }
-      }
-
-      if (entities == null || !entities.hasNext()) {
-        currentEntity = null;
-        return false;
-      }
-
-      currentEntity = entities.next().getEntity();
-      return true;
-    }
-
-    // Read the next batch of query results.
-    private Iterator<EntityResult> getIteratorAndMoveCursor() throws DatastoreException {
-      Query.Builder query = this.query.toBuilder().clone();
-      query.setLimit(Int32Value.newBuilder().setValue(QUERY_BATCH_LIMIT));
-      if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
-        query.setStartCursor(currentBatch.getEndCursor());
-      }
-
-      RunQueryRequest request = makeRequest(query.build(), namespace);
-      RunQueryResponse response = datastore.runQuery(request);
-
-      currentBatch = response.getBatch();
-
-      int numFetch = currentBatch.getEntityResultsCount();
-      // All indications from the API are that there are/may be more results.
-      moreResults = ((numFetch == QUERY_BATCH_LIMIT)
-              || (currentBatch.getMoreResults() == NOT_FINISHED));
-
-      // May receive a batch of 0 results if the number of records is a multiple
-      // of the request limit.
-      if (numFetch == 0) {
-        return null;
-      }
-
-      return currentBatch.getEntityResultsList().iterator();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3WriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3WriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3WriteIT.java
deleted file mode 100644
index 782065f..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3WriteIT.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.datastore;
-
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.countEntities;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.deleteAllEntities;
-import static org.junit.Assert.assertEquals;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.CountingInput;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.CreateEntityFn;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.ParDo;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.UUID;
-
-/**
- * End-to-end tests for Datastore V1Beta3.Write.
- */
-@RunWith(JUnit4.class)
-public class V1Beta3WriteIT {
-  private V1Beta3TestOptions options;
-  private String ancestor;
-  private final long numEntities = 1000;
-
-  @Before
-  public void setup() {
-    PipelineOptionsFactory.register(V1Beta3TestOptions.class);
-    options = TestPipeline.testingPipelineOptions().as(V1Beta3TestOptions.class);
-    ancestor = UUID.randomUUID().toString();
-  }
-
-  /**
-   * An end-to-end test for {@link V1Beta3.Write}.
-   *
-   * Write some test entities to datastore through a dataflow pipeline.
-   * Read and count all the entities. Verify that the count matches the
-   * number of entities written.
-   */
-  @Test
-  public void testE2EV1Beta3Write() throws Exception {
-    Pipeline p = Pipeline.create(options);
-
-    // Write to datastore
-    p.apply(CountingInput.upTo(numEntities))
-        .apply(ParDo.of(new CreateEntityFn(
-            options.getKind(), options.getNamespace(), ancestor)))
-        .apply(DatastoreIO.v1beta3().write().withProjectId(options.getProject()));
-
-    p.run();
-
-    // Count number of entities written to datastore.
-    long numEntitiesWritten = countEntities(options, ancestor);
-
-    assertEquals(numEntitiesWritten, numEntities);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    deleteAllEntities(options, ancestor);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
new file mode 100644
index 0000000..8fedc77
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.deleteAllEntities;
+import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.getDatastore;
+import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.makeAncestorKey;
+import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.makeEntity;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.UpsertMutationBuilder;
+import org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.V1TestWriter;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.Key;
+import com.google.datastore.v1.Query;
+import com.google.datastore.v1.client.Datastore;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.UUID;
+
+/**
+ * End-to-end tests for Datastore DatastoreV1.Read.
+ */
+@RunWith(JUnit4.class)
+public class V1ReadIT {
+  private V1TestOptions options;
+  private String ancestor;
+  private final long numEntities = 1000;
+
+  @Before
+  public void setup() {
+    PipelineOptionsFactory.register(V1TestOptions.class);
+    options = TestPipeline.testingPipelineOptions().as(V1TestOptions.class);
+    ancestor = UUID.randomUUID().toString();
+  }
+
+  /**
+   * An end-to-end test for {@link DatastoreV1.Read}.
+   *
+   * Write some test entities to datastore and then run a dataflow pipeline that
+   * reads and counts the total number of entities. Verify that the count matches the
+   * number of entities written.
+   */
+  @Test
+  public void testE2EV1Read() throws Exception {
+    // Create entities and write them to datastore
+    writeEntitiesToDatastore(options, ancestor, numEntities);
+
+    // Read from datastore
+    Query query = V1TestUtil.makeAncestorKindQuery(
+        options.getKind(), options.getNamespace(), ancestor);
+
+    DatastoreV1.Read read = DatastoreIO.v1().read()
+        .withProjectId(options.getProject())
+        .withQuery(query)
+        .withNamespace(options.getNamespace());
+
+    // Count the total number of entities
+    Pipeline p = Pipeline.create(options);
+    PCollection<Long> count = p
+        .apply(read)
+        .apply(Count.<Entity>globally());
+
+    PAssert.thatSingleton(count).isEqualTo(numEntities);
+    p.run();
+  }
+
+  // Creates entities and write them to datastore
+  private static void writeEntitiesToDatastore(V1TestOptions options, String ancestor,
+      long numEntities) throws Exception {
+    Datastore datastore = getDatastore(options, options.getProject());
+    // Write test entities to datastore
+    V1TestWriter writer = new V1TestWriter(datastore, new UpsertMutationBuilder());
+    Key ancestorKey = makeAncestorKey(options.getNamespace(), options.getKind(), ancestor);
+
+    for (long i = 0; i < numEntities; i++) {
+      Entity entity = makeEntity(i, ancestorKey, options.getKind(), options.getNamespace());
+      writer.write(entity);
+    }
+    writer.close();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    deleteAllEntities(options, ancestor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestOptions.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestOptions.java
new file mode 100644
index 0000000..360855f
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestOptions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+import javax.annotation.Nullable;
+
+/**
+ * DatastoreV1 Datastore related pipeline options.
+ */
+public interface V1TestOptions extends TestPipelineOptions {
+  @Description("Project ID to read from datastore")
+  @Default.String("apache-beam-testing")
+  String getProject();
+  void setProject(String value);
+
+  @Description("Datastore Entity kind")
+  @Default.String("beam-test")
+  String getKind();
+  void setKind(String value);
+
+  @Description("Datastore Namespace")
+  String getNamespace();
+  void setNamespace(@Nullable String value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
new file mode 100644
index 0000000..1e323ec
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static com.google.datastore.v1.QueryResultBatch.MoreResultsType.NOT_FINISHED;
+import static com.google.datastore.v1.client.DatastoreHelper.makeDelete;
+import static com.google.datastore.v1.client.DatastoreHelper.makeFilter;
+import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
+import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert;
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
+
+import com.google.api.client.auth.oauth2.Credential;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.datastore.v1.CommitRequest;
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.EntityResult;
+import com.google.datastore.v1.Key;
+import com.google.datastore.v1.Key.PathElement;
+import com.google.datastore.v1.Mutation;
+import com.google.datastore.v1.PropertyFilter;
+import com.google.datastore.v1.Query;
+import com.google.datastore.v1.QueryResultBatch;
+import com.google.datastore.v1.RunQueryRequest;
+import com.google.datastore.v1.RunQueryResponse;
+import com.google.datastore.v1.client.Datastore;
+import com.google.datastore.v1.client.DatastoreException;
+import com.google.datastore.v1.client.DatastoreFactory;
+import com.google.datastore.v1.client.DatastoreOptions;
+import com.google.protobuf.Int32Value;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import javax.annotation.Nullable;
+
+class V1TestUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(V1TestUtil.class);
+
+  /**
+   * A helper function to create the ancestor key for all created and queried entities.
+   */
+  static Key makeAncestorKey(@Nullable String namespace, String kind, String ancestor) {
+    Key.Builder keyBuilder = makeKey(kind, ancestor);
+    if (namespace != null) {
+      keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
+    }
+    return keyBuilder.build();
+  }
+
+  /**
+   * Build a datastore ancestor query for the specified kind, namespace and ancestor.
+   */
+  static Query makeAncestorKindQuery(String kind, @Nullable String namespace, String ancestor) {
+    Query.Builder q = Query.newBuilder();
+    q.addKindBuilder().setName(kind);
+    q.setFilter(makeFilter(
+        "__key__",
+        PropertyFilter.Operator.HAS_ANCESTOR,
+        makeValue(makeAncestorKey(namespace, kind, ancestor))));
+    return q.build();
+  }
+
+  /**
+   * Build an entity for the given ancestorKey, kind, namespace and value.
+   */
+  static Entity makeEntity(Long value, Key ancestorKey, String kind, @Nullable String namespace) {
+    Entity.Builder entityBuilder = Entity.newBuilder();
+    Key.Builder keyBuilder = makeKey(ancestorKey, kind, UUID.randomUUID().toString());
+    // NOTE: Namespace is not inherited between keys created with DatastoreHelper.makeKey, so
+    // we must set the namespace on keyBuilder. TODO: Once partitionId inheritance is added,
+    // we can simplify this code.
+    if (namespace != null) {
+      keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
+    }
+
+    entityBuilder.setKey(keyBuilder.build());
+    entityBuilder.getMutableProperties().put("value", makeValue(value).build());
+    return entityBuilder.build();
+  }
+
+  /**
+   * A DoFn that creates entity for a long number.
+   */
+  static class CreateEntityFn extends DoFn<Long, Entity> {
+    private final String kind;
+    @Nullable
+    private final String namespace;
+    private Key ancestorKey;
+
+    CreateEntityFn(String kind, @Nullable String namespace, String ancestor) {
+      this.kind = kind;
+      this.namespace = namespace;
+      // Build the ancestor key for all created entities once, including the namespace.
+      ancestorKey = makeAncestorKey(namespace, kind, ancestor);
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      c.output(makeEntity(c.element(), ancestorKey, kind, namespace));
+    }
+  }
+
+  /**
+   * Build a new datastore client.
+   */
+  static Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) {
+    DatastoreOptions.Builder builder =
+        new DatastoreOptions.Builder()
+            .projectId(projectId)
+            .initializer(
+                new RetryHttpRequestInitializer()
+            );
+
+    Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
+    if (credential != null) {
+      builder.credential(credential);
+    }
+    return DatastoreFactory.get().create(builder.build());
+  }
+
+  /**
+   * Build a datastore query request.
+   */
+  private static RunQueryRequest makeRequest(Query query, @Nullable String namespace) {
+    RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query);
+    if (namespace != null) {
+      requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
+    }
+    return requestBuilder.build();
+  }
+
+  /**
+   * Delete all entities with the given ancestor.
+   */
+  static void deleteAllEntities(V1TestOptions options, String ancestor) throws Exception {
+    Datastore datastore = getDatastore(options, options.getProject());
+    Query query = V1TestUtil.makeAncestorKindQuery(
+        options.getKind(), options.getNamespace(), ancestor);
+
+    V1TestReader reader = new V1TestReader(datastore, query, options.getNamespace());
+    V1TestWriter writer = new V1TestWriter(datastore, new DeleteMutationBuilder());
+
+    long numEntities = 0;
+    while (reader.advance()) {
+      Entity entity = reader.getCurrent();
+      numEntities++;
+      writer.write(entity);
+    }
+
+    writer.close();
+    LOG.info("Successfully deleted {} entities", numEntities);
+  }
+
+  /**
+   * Returns the total number of entities for the given datastore.
+   */
+  static long countEntities(V1TestOptions options, String ancestor) throws Exception {
+    // Read from datastore.
+    Datastore datastore = V1TestUtil.getDatastore(options, options.getProject());
+    Query query = V1TestUtil.makeAncestorKindQuery(
+        options.getKind(), options.getNamespace(), ancestor);
+
+    V1TestReader reader = new V1TestReader(datastore, query, options.getNamespace());
+
+    long numEntitiesRead = 0;
+    while (reader.advance()) {
+      reader.getCurrent();
+      numEntitiesRead++;
+    }
+    return numEntitiesRead;
+  }
+
+  /**
+   * An interface to represent any datastore mutation operation.
+   * Mutation operations include insert, delete, upsert, update.
+   */
+  interface MutationBuilder {
+    Mutation.Builder apply(Entity entity);
+  }
+
+  /**
+   *A MutationBuilder that performs upsert operation.
+   */
+  static class UpsertMutationBuilder implements MutationBuilder {
+    public Mutation.Builder apply(Entity entity) {
+      return makeUpsert(entity);
+    }
+  }
+
+  /**
+   * A MutationBuilder that performs delete operation.
+   */
+  static class DeleteMutationBuilder implements MutationBuilder {
+    public Mutation.Builder apply(Entity entity) {
+      return makeDelete(entity.getKey());
+    }
+  }
+
+  /**
+   * A helper class to write entities to datastore.
+   */
+  static class V1TestWriter {
+    private static final Logger LOG = LoggerFactory.getLogger(V1TestWriter.class);
+    // Limits the number of entities updated per batch
+    private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
+    // Number of times to retry on update failure
+    private static final int MAX_RETRIES = 5;
+    //Initial backoff time for exponential backoff for retry attempts.
+    private static final int INITIAL_BACKOFF_MILLIS = 5000;
+
+    // Returns true if a Datastore key is complete. A key is complete if its last element
+    // has either an id or a name.
+    static boolean isValidKey(Key key) {
+      List<PathElement> elementList = key.getPathList();
+      if (elementList.isEmpty()) {
+        return false;
+      }
+      PathElement lastElement = elementList.get(elementList.size() - 1);
+      return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
+    }
+
+    private final Datastore datastore;
+    private final MutationBuilder mutationBuilder;
+    private final List<Entity> entities = new ArrayList<>();
+
+    V1TestWriter(Datastore datastore, MutationBuilder mutationBuilder) {
+      this.datastore = datastore;
+      this.mutationBuilder = mutationBuilder;
+    }
+
+    void write(Entity value) throws Exception {
+      // Verify that the entity to write has a complete key.
+      if (!isValidKey(value.getKey())) {
+        throw new IllegalArgumentException(
+            "Entities to be written to the Datastore must have complete keys");
+      }
+
+      entities.add(value);
+
+      if (entities.size() >= DATASTORE_BATCH_UPDATE_LIMIT) {
+        flushBatch();
+      }
+    }
+
+    void close() throws Exception {
+      // flush any remaining entities
+      if (entities.size() > 0) {
+        flushBatch();
+      }
+    }
+
+    // commit the list of entities to datastore
+    private void flushBatch() throws DatastoreException, IOException, InterruptedException {
+      LOG.info("Writing batch of {} entities", entities.size());
+      Sleeper sleeper = Sleeper.DEFAULT;
+      BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
+
+      while (true) {
+        // Batch mutate entities.
+        try {
+          CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+          for (Entity entity: entities) {
+            commitRequest.addMutations(mutationBuilder.apply(entity));
+          }
+          commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+          datastore.commit(commitRequest.build());
+          // Break if the commit threw no exception.
+          break;
+        } catch (DatastoreException exception) {
+          LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
+              exception.getMessage());
+          if (!BackOffUtils.next(sleeper, backoff)) {
+            LOG.error("Aborting after {} retries.", MAX_RETRIES);
+            throw exception;
+          }
+        }
+      }
+      LOG.info("Successfully wrote {} entities", entities.size());
+      entities.clear();
+    }
+  }
+
+  /**
+   * A helper class to read entities from datastore.
+   */
+  static class V1TestReader {
+    private static final int QUERY_BATCH_LIMIT = 500;
+    private final Datastore datastore;
+    private final Query query;
+    @Nullable
+    private final String namespace;
+    private boolean moreResults;
+    private java.util.Iterator<EntityResult> entities;
+    // Current batch of query results
+    private QueryResultBatch currentBatch;
+    private Entity currentEntity;
+
+    V1TestReader(Datastore datastore, Query query, @Nullable String namespace) {
+      this.datastore = datastore;
+      this.query = query;
+      this.namespace = namespace;
+    }
+
+    Entity getCurrent() {
+      return currentEntity;
+    }
+
+    boolean advance() throws IOException {
+      if (entities == null || (!entities.hasNext() && moreResults)) {
+        try {
+          entities = getIteratorAndMoveCursor();
+        } catch (DatastoreException e) {
+          throw new IOException(e);
+        }
+      }
+
+      if (entities == null || !entities.hasNext()) {
+        currentEntity = null;
+        return false;
+      }
+
+      currentEntity = entities.next().getEntity();
+      return true;
+    }
+
+    // Read the next batch of query results.
+    private Iterator<EntityResult> getIteratorAndMoveCursor() throws DatastoreException {
+      Query.Builder query = this.query.toBuilder().clone();
+      query.setLimit(Int32Value.newBuilder().setValue(QUERY_BATCH_LIMIT));
+      if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
+        query.setStartCursor(currentBatch.getEndCursor());
+      }
+
+      RunQueryRequest request = makeRequest(query.build(), namespace);
+      RunQueryResponse response = datastore.runQuery(request);
+
+      currentBatch = response.getBatch();
+
+      int numFetch = currentBatch.getEntityResultsCount();
+      // All indications from the API are that there are/may be more results.
+      moreResults = ((numFetch == QUERY_BATCH_LIMIT)
+          || (currentBatch.getMoreResults() == NOT_FINISHED));
+
+      // May receive a batch of 0 results if the number of records is a multiple
+      // of the request limit.
+      if (numFetch == 0) {
+        return null;
+      }
+
+      return currentBatch.getEntityResultsList().iterator();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
new file mode 100644
index 0000000..b97c05c
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.countEntities;
+import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.deleteAllEntities;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.CreateEntityFn;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.ParDo;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.UUID;
+
+/**
+ * End-to-end tests for Datastore DatastoreV1.Write.
+ */
+@RunWith(JUnit4.class)
+public class V1WriteIT {
+  private V1TestOptions options;
+  private String ancestor;
+  private final long numEntities = 1000;
+
+  @Before
+  public void setup() {
+    PipelineOptionsFactory.register(V1TestOptions.class);
+    options = TestPipeline.testingPipelineOptions().as(V1TestOptions.class);
+    ancestor = UUID.randomUUID().toString();
+  }
+
+  /**
+   * An end-to-end test for {@link DatastoreV1.Write}.
+   *
+   * Write some test entities to datastore through a dataflow pipeline.
+   * Read and count all the entities. Verify that the count matches the
+   * number of entities written.
+   */
+  @Test
+  public void testE2EV1Write() throws Exception {
+    Pipeline p = Pipeline.create(options);
+
+    // Write to datastore
+    p.apply(CountingInput.upTo(numEntities))
+        .apply(ParDo.of(new CreateEntityFn(
+            options.getKind(), options.getNamespace(), ancestor)))
+        .apply(DatastoreIO.v1().write().withProjectId(options.getProject()));
+
+    p.run();
+
+    // Count number of entities written to datastore.
+    long numEntitiesWritten = countEntities(options, ancestor);
+
+    assertEquals(numEntitiesWritten, numEntities);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    deleteAllEntities(options, ancestor);
+  }
+}