You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/20 01:10:50 UTC
[2/4] incubator-beam git commit: DatastoreIO v1beta3 to v1
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
deleted file mode 100644
index b0c6c18..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
+++ /dev/null
@@ -1,792 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.datastore;
-
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.DEFAULT_BUNDLE_SIZE_BYTES;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.QUERY_BATCH_LIMIT;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.getEstimatedSizeBytes;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.makeRequest;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.isValidKey;
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL;
-import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeDelete;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.hasItem;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DatastoreWriterFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteEntity;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteEntityFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteKey;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteKeyFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.ReadFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.SplitQueryFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.V1Beta3Options;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.UpsertFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.V1Beta3DatastoreFactory;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Write;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.testing.RunnableOnService;
-import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.DoFnTester.CloningBehavior;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.POutput;
-
-import com.google.datastore.v1beta3.CommitRequest;
-import com.google.datastore.v1beta3.Entity;
-import com.google.datastore.v1beta3.EntityResult;
-import com.google.datastore.v1beta3.Key;
-import com.google.datastore.v1beta3.Mutation;
-import com.google.datastore.v1beta3.PartitionId;
-import com.google.datastore.v1beta3.Query;
-import com.google.datastore.v1beta3.QueryResultBatch;
-import com.google.datastore.v1beta3.RunQueryRequest;
-import com.google.datastore.v1beta3.RunQueryResponse;
-import com.google.datastore.v1beta3.client.Datastore;
-import com.google.datastore.v1beta3.client.QuerySplitter;
-import com.google.protobuf.Int32Value;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Tests for {@link V1Beta3}.
- */
-@RunWith(JUnit4.class)
-public class V1Beta3Test {
- private static final String PROJECT_ID = "testProject";
- private static final String NAMESPACE = "testNamespace";
- private static final String KIND = "testKind";
- private static final Query QUERY;
- private static final V1Beta3Options v1Beta3Options;
- static {
- Query.Builder q = Query.newBuilder();
- q.addKindBuilder().setName(KIND);
- QUERY = q.build();
- v1Beta3Options = V1Beta3Options.from(PROJECT_ID, QUERY, NAMESPACE);
- }
- private V1Beta3.Read initialRead;
-
- @Mock
- Datastore mockDatastore;
- @Mock
- QuerySplitter mockQuerySplitter;
- @Mock
- V1Beta3DatastoreFactory mockDatastoreFactory;
-
- @Rule
- public final ExpectedException thrown = ExpectedException.none();
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
-
- initialRead = DatastoreIO.v1beta3().read()
- .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
-
- when(mockDatastoreFactory.getDatastore(any(PipelineOptions.class), any(String.class)))
- .thenReturn(mockDatastore);
- when(mockDatastoreFactory.getQuerySplitter())
- .thenReturn(mockQuerySplitter);
- }
-
- @Test
- public void testBuildRead() throws Exception {
- V1Beta3.Read read = DatastoreIO.v1beta3().read()
- .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
- assertEquals(QUERY, read.getQuery());
- assertEquals(PROJECT_ID, read.getProjectId());
- assertEquals(NAMESPACE, read.getNamespace());
- }
-
- /**
- * {@link #testBuildRead} but constructed in a different order.
- */
- @Test
- public void testBuildReadAlt() throws Exception {
- V1Beta3.Read read = DatastoreIO.v1beta3().read()
- .withProjectId(PROJECT_ID).withNamespace(NAMESPACE).withQuery(QUERY);
- assertEquals(QUERY, read.getQuery());
- assertEquals(PROJECT_ID, read.getProjectId());
- assertEquals(NAMESPACE, read.getNamespace());
- }
-
- @Test
- public void testReadValidationFailsProject() throws Exception {
- V1Beta3.Read read = DatastoreIO.v1beta3().read().withQuery(QUERY);
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("project");
- read.validate(null);
- }
-
- @Test
- public void testReadValidationFailsQuery() throws Exception {
- V1Beta3.Read read = DatastoreIO.v1beta3().read().withProjectId(PROJECT_ID);
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("query");
- read.validate(null);
- }
-
- @Test
- public void testReadValidationFailsQueryLimitZero() throws Exception {
- Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build();
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Invalid query limit 0: must be positive");
-
- DatastoreIO.v1beta3().read().withQuery(invalidLimit);
- }
-
- @Test
- public void testReadValidationFailsQueryLimitNegative() throws Exception {
- Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build();
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Invalid query limit -5: must be positive");
-
- DatastoreIO.v1beta3().read().withQuery(invalidLimit);
- }
-
- @Test
- public void testReadValidationSucceedsNamespace() throws Exception {
- V1Beta3.Read read = DatastoreIO.v1beta3().read().withProjectId(PROJECT_ID).withQuery(QUERY);
- /* Should succeed, as a null namespace is fine. */
- read.validate(null);
- }
-
- @Test
- public void testReadDisplayData() {
- V1Beta3.Read read = DatastoreIO.v1beta3().read()
- .withProjectId(PROJECT_ID)
- .withQuery(QUERY)
- .withNamespace(NAMESPACE);
-
- DisplayData displayData = DisplayData.from(read);
-
- assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
- assertThat(displayData, hasDisplayItem("query", QUERY.toString()));
- assertThat(displayData, hasDisplayItem("namespace", NAMESPACE));
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testSourcePrimitiveDisplayData() {
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- PTransform<PBegin, ? extends POutput> read = DatastoreIO.v1beta3().read().withProjectId(
- "myProject").withQuery(Query.newBuilder().build());
-
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
- assertThat("DatastoreIO read should include the project in its primitive display data",
- displayData, hasItem(hasDisplayItem("projectId")));
- }
-
- @Test
- public void testWriteDoesNotAllowNullProject() throws Exception {
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("projectId");
-
- DatastoreIO.v1beta3().write().withProjectId(null);
- }
-
- @Test
- public void testWriteValidationFailsWithNoProject() throws Exception {
- Write write = DatastoreIO.v1beta3().write();
-
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("projectId");
-
- write.validate(null);
- }
-
- @Test
- public void testWriteValidationSucceedsWithProject() throws Exception {
- Write write = DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
- write.validate(null);
- }
-
- @Test
- public void testWriteDisplayData() {
- Write write = DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
-
- DisplayData displayData = DisplayData.from(write);
-
- assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
- }
-
- @Test
- public void testDeleteEntityDoesNotAllowNullProject() throws Exception {
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("projectId");
-
- DatastoreIO.v1beta3().deleteEntity().withProjectId(null);
- }
-
- @Test
- public void testDeleteEntityValidationFailsWithNoProject() throws Exception {
- DeleteEntity deleteEntity = DatastoreIO.v1beta3().deleteEntity();
-
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("projectId");
-
- deleteEntity.validate(null);
- }
-
- @Test
- public void testDeleteEntityValidationSucceedsWithProject() throws Exception {
- DeleteEntity deleteEntity = DatastoreIO.v1beta3().deleteEntity().withProjectId(PROJECT_ID);
- deleteEntity.validate(null);
- }
-
- @Test
- public void testDeleteEntityDisplayData() {
- DeleteEntity deleteEntity = DatastoreIO.v1beta3().deleteEntity().withProjectId(PROJECT_ID);
-
- DisplayData displayData = DisplayData.from(deleteEntity);
-
- assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
- }
-
- @Test
- public void testDeleteKeyDoesNotAllowNullProject() throws Exception {
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("projectId");
-
- DatastoreIO.v1beta3().deleteKey().withProjectId(null);
- }
-
- @Test
- public void testDeleteKeyValidationFailsWithNoProject() throws Exception {
- DeleteKey deleteKey = DatastoreIO.v1beta3().deleteKey();
-
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("projectId");
-
- deleteKey.validate(null);
- }
-
- @Test
- public void testDeleteKeyValidationSucceedsWithProject() throws Exception {
- DeleteKey deleteKey = DatastoreIO.v1beta3().deleteKey().withProjectId(PROJECT_ID);
- deleteKey.validate(null);
- }
-
- @Test
- public void testDeleteKeyDisplayData() {
- DeleteKey deleteKey = DatastoreIO.v1beta3().deleteKey().withProjectId(PROJECT_ID);
-
- DisplayData displayData = DisplayData.from(deleteKey);
-
- assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testWritePrimitiveDisplayData() {
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- PTransform<PCollection<Entity>, ?> write =
- DatastoreIO.v1beta3().write().withProjectId("myProject");
-
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
- assertThat("DatastoreIO write should include the project in its primitive display data",
- displayData, hasItem(hasDisplayItem("projectId")));
- assertThat("DatastoreIO write should include the upsertFn in its primitive display data",
- displayData, hasItem(hasDisplayItem("upsertFn")));
-
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testDeleteEntityPrimitiveDisplayData() {
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- PTransform<PCollection<Entity>, ?> write =
- DatastoreIO.v1beta3().deleteEntity().withProjectId("myProject");
-
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
- assertThat("DatastoreIO write should include the project in its primitive display data",
- displayData, hasItem(hasDisplayItem("projectId")));
- assertThat("DatastoreIO write should include the deleteEntityFn in its primitive display data",
- displayData, hasItem(hasDisplayItem("deleteEntityFn")));
-
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testDeleteKeyPrimitiveDisplayData() {
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- PTransform<PCollection<Key>, ?> write =
- DatastoreIO.v1beta3().deleteKey().withProjectId("myProject");
-
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
- assertThat("DatastoreIO write should include the project in its primitive display data",
- displayData, hasItem(hasDisplayItem("projectId")));
- assertThat("DatastoreIO write should include the deleteKeyFn in its primitive display data",
- displayData, hasItem(hasDisplayItem("deleteKeyFn")));
-
- }
-
- /**
- * Test building a Write using builder methods.
- */
- @Test
- public void testBuildWrite() throws Exception {
- V1Beta3.Write write = DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
- assertEquals(PROJECT_ID, write.getProjectId());
- }
-
- /**
- * Test the detection of complete and incomplete keys.
- */
- @Test
- public void testHasNameOrId() {
- Key key;
- // Complete with name, no ancestor
- key = makeKey("bird", "finch").build();
- assertTrue(isValidKey(key));
-
- // Complete with id, no ancestor
- key = makeKey("bird", 123).build();
- assertTrue(isValidKey(key));
-
- // Incomplete, no ancestor
- key = makeKey("bird").build();
- assertFalse(isValidKey(key));
-
- // Complete with name and ancestor
- key = makeKey("bird", "owl").build();
- key = makeKey(key, "bird", "horned").build();
- assertTrue(isValidKey(key));
-
- // Complete with id and ancestor
- key = makeKey("bird", "owl").build();
- key = makeKey(key, "bird", 123).build();
- assertTrue(isValidKey(key));
-
- // Incomplete with ancestor
- key = makeKey("bird", "owl").build();
- key = makeKey(key, "bird").build();
- assertFalse(isValidKey(key));
-
- key = makeKey().build();
- assertFalse(isValidKey(key));
- }
-
- /**
- * Test that entities with incomplete keys cannot be updated.
- */
- @Test
- public void testAddEntitiesWithIncompleteKeys() throws Exception {
- Key key = makeKey("bird").build();
- Entity entity = Entity.newBuilder().setKey(key).build();
- UpsertFn upsertFn = new UpsertFn();
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Entities to be written to the Datastore must have complete keys");
-
- upsertFn.apply(entity);
- }
-
- @Test
- /**
- * Test that entities with valid keys are transformed to upsert mutations.
- */
- public void testAddEntities() throws Exception {
- Key key = makeKey("bird", "finch").build();
- Entity entity = Entity.newBuilder().setKey(key).build();
- UpsertFn upsertFn = new UpsertFn();
-
- Mutation exceptedMutation = makeUpsert(entity).build();
- assertEquals(upsertFn.apply(entity), exceptedMutation);
- }
-
- /**
- * Test that entities with incomplete keys cannot be deleted.
- */
- @Test
- public void testDeleteEntitiesWithIncompleteKeys() throws Exception {
- Key key = makeKey("bird").build();
- Entity entity = Entity.newBuilder().setKey(key).build();
- DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Entities to be deleted from the Datastore must have complete keys");
-
- deleteEntityFn.apply(entity);
- }
-
- /**
- * Test that entities with valid keys are transformed to delete mutations.
- */
- @Test
- public void testDeleteEntities() throws Exception {
- Key key = makeKey("bird", "finch").build();
- Entity entity = Entity.newBuilder().setKey(key).build();
- DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
-
- Mutation exceptedMutation = makeDelete(entity.getKey()).build();
- assertEquals(deleteEntityFn.apply(entity), exceptedMutation);
- }
-
- /**
- * Test that incomplete keys cannot be deleted.
- */
- @Test
- public void testDeleteIncompleteKeys() throws Exception {
- Key key = makeKey("bird").build();
- DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Keys to be deleted from the Datastore must be complete");
-
- deleteKeyFn.apply(key);
- }
-
- /**
- * Test that valid keys are transformed to delete mutations.
- */
- @Test
- public void testDeleteKeys() throws Exception {
- Key key = makeKey("bird", "finch").build();
- DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
-
- Mutation exceptedMutation = makeDelete(key).build();
- assertEquals(deleteKeyFn.apply(key), exceptedMutation);
- }
-
- @Test
- public void testDatastoreWriteFnDisplayData() {
- DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID);
- DisplayData displayData = DisplayData.from(datastoreWriter);
- assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
- }
-
- /** Tests {@link DatastoreWriterFn} with entities less than one batch. */
- @Test
- public void testDatatoreWriterFnWithOneBatch() throws Exception {
- datastoreWriterFnTest(100);
- }
-
- /** Tests {@link DatastoreWriterFn} with entities of more than one batches, but not a multiple. */
- @Test
- public void testDatatoreWriterFnWithMultipleBatches() throws Exception {
- datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 3 + 100);
- }
-
- /**
- * Tests {@link DatastoreWriterFn} with entities of several batches, using an exact multiple of
- * write batch size.
- */
- @Test
- public void testDatatoreWriterFnWithBatchesExactMultiple() throws Exception {
- datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 2);
- }
-
- // A helper method to test DatastoreWriterFn for various batch sizes.
- private void datastoreWriterFnTest(int numMutations) throws Exception {
- // Create the requested number of mutations.
- List<Mutation> mutations = new ArrayList<>(numMutations);
- for (int i = 0; i < numMutations; ++i) {
- mutations.add(
- makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build());
- }
-
- DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, mockDatastoreFactory);
- DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
- doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
- doFnTester.processBundle(mutations);
-
- int start = 0;
- while (start < numMutations) {
- int end = Math.min(numMutations, start + DATASTORE_BATCH_UPDATE_LIMIT);
- CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
- commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
- commitRequest.addAllMutations(mutations.subList(start, end));
- // Verify all the batch requests were made with the expected mutations.
- verify(mockDatastore, times(1)).commit(commitRequest.build());
- start = end;
- }
- }
-
- /**
- * Tests {@link V1Beta3.Read#getEstimatedSizeBytes} to fetch and return estimated size for a
- * query.
- */
- @Test
- public void testEstimatedSizeBytes() throws Exception {
- long entityBytes = 100L;
- // Per Kind statistics request and response
- RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), NAMESPACE);
- RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
-
- when(mockDatastore.runQuery(statRequest))
- .thenReturn(statResponse);
-
- assertEquals(entityBytes, getEstimatedSizeBytes(mockDatastore, QUERY, NAMESPACE));
- verify(mockDatastore, times(1)).runQuery(statRequest);
- }
-
- /**
- * Tests {@link SplitQueryFn} when number of query splits is specified.
- */
- @Test
- public void testSplitQueryFnWithNumSplits() throws Exception {
- int numSplits = 100;
- when(mockQuerySplitter.getSplits(
- eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class)))
- .thenReturn(splitQuery(QUERY, numSplits));
-
- SplitQueryFn splitQueryFn = new SplitQueryFn(v1Beta3Options, numSplits, mockDatastoreFactory);
- DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn);
- /**
- * Although Datastore client is marked transient in {@link SplitQueryFn}, when injected through
- * mock factory using a when clause for unit testing purposes, it is not serializable
- * because it doesn't have a no-arg constructor. Thus disabling the cloning to prevent the
- * doFn from being serialized.
- */
- doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
- List<KV<Integer, Query>> queries = doFnTester.processBundle(QUERY);
-
- assertEquals(queries.size(), numSplits);
- verifyUniqueKeys(queries);
- verify(mockQuerySplitter, times(1)).getSplits(
- eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class));
- verifyZeroInteractions(mockDatastore);
- }
-
- /**
- * Tests {@link SplitQueryFn} when no query splits is specified.
- */
- @Test
- public void testSplitQueryFnWithoutNumSplits() throws Exception {
- // Force SplitQueryFn to compute the number of query splits
- int numSplits = 0;
- int expectedNumSplits = 20;
- long entityBytes = expectedNumSplits * DEFAULT_BUNDLE_SIZE_BYTES;
-
- // Per Kind statistics request and response
- RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), NAMESPACE);
- RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
-
- when(mockDatastore.runQuery(statRequest))
- .thenReturn(statResponse);
- when(mockQuerySplitter.getSplits(
- eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class)))
- .thenReturn(splitQuery(QUERY, expectedNumSplits));
-
- SplitQueryFn splitQueryFn = new SplitQueryFn(v1Beta3Options, numSplits, mockDatastoreFactory);
- DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn);
- doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
- List<KV<Integer, Query>> queries = doFnTester.processBundle(QUERY);
-
- assertEquals(queries.size(), expectedNumSplits);
- verifyUniqueKeys(queries);
- verify(mockQuerySplitter, times(1)).getSplits(
- eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class));
- verify(mockDatastore, times(1)).runQuery(statRequest);
- }
-
- /**
- * Tests {@link V1Beta3.Read.SplitQueryFn} when the query has a user specified limit.
- */
- @Test
- public void testSplitQueryFnWithQueryLimit() throws Exception {
- Query queryWithLimit = QUERY.toBuilder().clone()
- .setLimit(Int32Value.newBuilder().setValue(1))
- .build();
-
- SplitQueryFn splitQueryFn = new SplitQueryFn(v1Beta3Options, 10, mockDatastoreFactory);
- DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn);
- doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
- List<KV<Integer, Query>> queries = doFnTester.processBundle(queryWithLimit);
-
- assertEquals(queries.size(), 1);
- verifyUniqueKeys(queries);
- verifyNoMoreInteractions(mockDatastore);
- verifyNoMoreInteractions(mockQuerySplitter);
- }
-
- /** Tests {@link ReadFn} with a query limit less than one batch. */
- @Test
- public void testReadFnWithOneBatch() throws Exception {
- readFnTest(5);
- }
-
- /** Tests {@link ReadFn} with a query limit more than one batch, and not a multiple. */
- @Test
- public void testReadFnWithMultipleBatches() throws Exception {
- readFnTest(QUERY_BATCH_LIMIT + 5);
- }
-
- /** Tests {@link ReadFn} for several batches, using an exact multiple of batch size results. */
- @Test
- public void testReadFnWithBatchesExactMultiple() throws Exception {
- readFnTest(5 * QUERY_BATCH_LIMIT);
- }
-
- /** Helper Methods */
-
- /** A helper function that verifies if all the queries have unique keys. */
- private void verifyUniqueKeys(List<KV<Integer, Query>> queries) {
- Set<Integer> keys = new HashSet<>();
- for (KV<Integer, Query> kv: queries) {
- keys.add(kv.getKey());
- }
- assertEquals(keys.size(), queries.size());
- }
-
- /**
- * A helper function that creates mock {@link Entity} results in response to a query. Always
- * indicates that more results are available, unless the batch is limited to fewer than
- * {@link V1Beta3.Read#QUERY_BATCH_LIMIT} results.
- */
- private static RunQueryResponse mockResponseForQuery(Query q) {
- // Every query V1Beta3 sends should have a limit.
- assertTrue(q.hasLimit());
-
- // The limit should be in the range [1, QUERY_BATCH_LIMIT]
- int limit = q.getLimit().getValue();
- assertThat(limit, greaterThanOrEqualTo(1));
- assertThat(limit, lessThanOrEqualTo(QUERY_BATCH_LIMIT));
-
- // Create the requested number of entities.
- List<EntityResult> entities = new ArrayList<>(limit);
- for (int i = 0; i < limit; ++i) {
- entities.add(
- EntityResult.newBuilder()
- .setEntity(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)))
- .build());
- }
-
- // Fill out the other parameters on the returned result batch.
- RunQueryResponse.Builder ret = RunQueryResponse.newBuilder();
- ret.getBatchBuilder()
- .addAllEntityResults(entities)
- .setEntityResultType(EntityResult.ResultType.FULL)
- .setMoreResults(
- limit == QUERY_BATCH_LIMIT
- ? QueryResultBatch.MoreResultsType.NOT_FINISHED
- : QueryResultBatch.MoreResultsType.NO_MORE_RESULTS);
-
- return ret.build();
- }
-
- /** Helper function to run a test reading from a {@link ReadFn}. */
- private void readFnTest(int numEntities) throws Exception {
- // An empty query to read entities.
- Query query = Query.newBuilder().setLimit(
- Int32Value.newBuilder().setValue(numEntities)).build();
-
- // Use mockResponseForQuery to generate results.
- when(mockDatastore.runQuery(any(RunQueryRequest.class)))
- .thenAnswer(new Answer<RunQueryResponse>() {
- @Override
- public RunQueryResponse answer(InvocationOnMock invocationOnMock) throws Throwable {
- Query q = ((RunQueryRequest) invocationOnMock.getArguments()[0]).getQuery();
- return mockResponseForQuery(q);
- }
- });
-
- ReadFn readFn = new ReadFn(v1Beta3Options, mockDatastoreFactory);
- DoFnTester<Query, Entity> doFnTester = DoFnTester.of(readFn);
- /**
- * Although Datastore client is marked transient in {@link ReadFn}, when injected through
- * mock factory using a when clause for unit testing purposes, it is not serializable
- * because it doesn't have a no-arg constructor. Thus disabling the cloning to prevent the
- * test object from being serialized.
- */
- doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
- List<Entity> entities = doFnTester.processBundle(query);
-
- int expectedNumCallsToRunQuery = (int) Math.ceil((double) numEntities / QUERY_BATCH_LIMIT);
- verify(mockDatastore, times(expectedNumCallsToRunQuery)).runQuery(any(RunQueryRequest.class));
- // Validate the number of results.
- assertEquals(numEntities, entities.size());
- }
-
- /** Builds a per-kind statistics response with the given entity size. */
- private static RunQueryResponse makeStatKindResponse(long entitySizeInBytes) {
- RunQueryResponse.Builder timestampResponse = RunQueryResponse.newBuilder();
- Entity.Builder entity = Entity.newBuilder();
- entity.setKey(makeKey("dummyKind", "dummyId"));
- entity.getMutableProperties().put("entity_bytes", makeValue(entitySizeInBytes).build());
- EntityResult.Builder entityResult = EntityResult.newBuilder();
- entityResult.setEntity(entity);
- QueryResultBatch.Builder batch = QueryResultBatch.newBuilder();
- batch.addEntityResults(entityResult);
- timestampResponse.setBatch(batch);
- return timestampResponse.build();
- }
-
- /** Builds a per-kind statistics query for the given timestamp and namespace. */
- private static Query makeStatKindQuery(String namespace) {
- Query.Builder statQuery = Query.newBuilder();
- if (namespace == null) {
- statQuery.addKindBuilder().setName("__Stat_Kind__");
- } else {
- statQuery.addKindBuilder().setName("__Ns_Stat_Kind__");
- }
- statQuery.setFilter(makeFilter("kind_name", EQUAL, makeValue(KIND)).build());
- statQuery.addOrder(makeOrder("timestamp", DESCENDING));
- statQuery.setLimit(Int32Value.newBuilder().setValue(1));
- return statQuery.build();
- }
-
- /** Generate dummy query splits. */
- private List<Query> splitQuery(Query query, int numSplits) {
- List<Query> queries = new LinkedList<>();
- for (int i = 0; i < numSplits; i++) {
- queries.add(query.toBuilder().clone().build());
- }
- return queries;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestOptions.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestOptions.java
deleted file mode 100644
index 099ebe0..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestOptions.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.datastore;
-
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-
-import javax.annotation.Nullable;
-
-/**
- * V1Beta3 Datastore related pipeline options.
- */
-public interface V1Beta3TestOptions extends TestPipelineOptions {
- @Description("Project ID to read from datastore")
- @Default.String("apache-beam-testing")
- String getProject();
- void setProject(String value);
-
- @Description("Datastore Entity kind")
- @Default.String("beam-test")
- String getKind();
- void setKind(String value);
-
- @Description("Datastore Namespace")
- String getNamespace();
- void setNamespace(@Nullable String value);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
deleted file mode 100644
index 7eaf23e..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.datastore;
-
-import static com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NOT_FINISHED;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeDelete;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
-
-import org.apache.beam.sdk.options.GcpOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
-import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.datastore.v1beta3.CommitRequest;
-import com.google.datastore.v1beta3.Entity;
-import com.google.datastore.v1beta3.EntityResult;
-import com.google.datastore.v1beta3.Key;
-import com.google.datastore.v1beta3.Key.PathElement;
-import com.google.datastore.v1beta3.Mutation;
-import com.google.datastore.v1beta3.PropertyFilter;
-import com.google.datastore.v1beta3.Query;
-import com.google.datastore.v1beta3.QueryResultBatch;
-import com.google.datastore.v1beta3.RunQueryRequest;
-import com.google.datastore.v1beta3.RunQueryResponse;
-import com.google.datastore.v1beta3.client.Datastore;
-import com.google.datastore.v1beta3.client.DatastoreException;
-import com.google.datastore.v1beta3.client.DatastoreFactory;
-import com.google.datastore.v1beta3.client.DatastoreOptions;
-import com.google.protobuf.Int32Value;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
-import javax.annotation.Nullable;
-
-class V1Beta3TestUtil {
- private static final Logger LOG = LoggerFactory.getLogger(V1Beta3TestUtil.class);
-
- /**
- * A helper function to create the ancestor key for all created and queried entities.
- */
- static Key makeAncestorKey(@Nullable String namespace, String kind, String ancestor) {
- Key.Builder keyBuilder = makeKey(kind, ancestor);
- if (namespace != null) {
- keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
- }
- return keyBuilder.build();
- }
-
- /**
- * Build a datastore ancestor query for the specified kind, namespace and ancestor.
- */
- static Query makeAncestorKindQuery(String kind, @Nullable String namespace, String ancestor) {
- Query.Builder q = Query.newBuilder();
- q.addKindBuilder().setName(kind);
- q.setFilter(makeFilter(
- "__key__",
- PropertyFilter.Operator.HAS_ANCESTOR,
- makeValue(makeAncestorKey(namespace, kind, ancestor))));
- return q.build();
- }
-
- /**
- * Build an entity for the given ancestorKey, kind, namespace and value.
- */
- static Entity makeEntity(Long value, Key ancestorKey, String kind, @Nullable String namespace) {
- Entity.Builder entityBuilder = Entity.newBuilder();
- Key.Builder keyBuilder = makeKey(ancestorKey, kind, UUID.randomUUID().toString());
- // NOTE: Namespace is not inherited between keys created with DatastoreHelper.makeKey, so
- // we must set the namespace on keyBuilder. TODO: Once partitionId inheritance is added,
- // we can simplify this code.
- if (namespace != null) {
- keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
- }
-
- entityBuilder.setKey(keyBuilder.build());
- entityBuilder.getMutableProperties().put("value", makeValue(value).build());
- return entityBuilder.build();
- }
-
- /**
- * A DoFn that creates entity for a long number.
- */
- static class CreateEntityFn extends DoFn<Long, Entity> {
- private final String kind;
- @Nullable
- private final String namespace;
- private Key ancestorKey;
-
- CreateEntityFn(String kind, @Nullable String namespace, String ancestor) {
- this.kind = kind;
- this.namespace = namespace;
- // Build the ancestor key for all created entities once, including the namespace.
- ancestorKey = makeAncestorKey(namespace, kind, ancestor);
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- c.output(makeEntity(c.element(), ancestorKey, kind, namespace));
- }
- }
-
- /**
- * Build a new datastore client.
- */
- static Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) {
- DatastoreOptions.Builder builder =
- new DatastoreOptions.Builder()
- .projectId(projectId)
- .initializer(
- new RetryHttpRequestInitializer()
- );
-
- Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
- if (credential != null) {
- builder.credential(credential);
- }
- return DatastoreFactory.get().create(builder.build());
- }
-
- /**
- * Build a datastore query request.
- */
- private static RunQueryRequest makeRequest(Query query, @Nullable String namespace) {
- RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query);
- if (namespace != null) {
- requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
- }
- return requestBuilder.build();
- }
-
- /**
- * Delete all entities with the given ancestor.
- */
- static void deleteAllEntities(V1Beta3TestOptions options, String ancestor) throws Exception {
- Datastore datastore = getDatastore(options, options.getProject());
- Query query = V1Beta3TestUtil.makeAncestorKindQuery(
- options.getKind(), options.getNamespace(), ancestor);
-
- V1Beta3TestReader reader = new V1Beta3TestReader(datastore, query, options.getNamespace());
- V1Beta3TestWriter writer = new V1Beta3TestWriter(datastore, new DeleteMutationBuilder());
-
- long numEntities = 0;
- while (reader.advance()) {
- Entity entity = reader.getCurrent();
- numEntities++;
- writer.write(entity);
- }
-
- writer.close();
- LOG.info("Successfully deleted {} entities", numEntities);
- }
-
- /**
- * Returns the total number of entities for the given datastore.
- */
- static long countEntities(V1Beta3TestOptions options, String ancestor) throws Exception {
- // Read from datastore.
- Datastore datastore = V1Beta3TestUtil.getDatastore(options, options.getProject());
- Query query = V1Beta3TestUtil.makeAncestorKindQuery(
- options.getKind(), options.getNamespace(), ancestor);
-
- V1Beta3TestReader reader = new V1Beta3TestReader(datastore, query, options.getNamespace());
-
- long numEntitiesRead = 0;
- while (reader.advance()) {
- reader.getCurrent();
- numEntitiesRead++;
- }
- return numEntitiesRead;
- }
-
- /**
- * An interface to represent any datastore mutation operation.
- * Mutation operations include insert, delete, upsert, update.
- */
- interface MutationBuilder {
- Mutation.Builder apply(Entity entity);
- }
-
- /**
- *A MutationBuilder that performs upsert operation.
- */
- static class UpsertMutationBuilder implements MutationBuilder {
- public Mutation.Builder apply(Entity entity) {
- return makeUpsert(entity);
- }
- }
-
- /**
- * A MutationBuilder that performs delete operation.
- */
- static class DeleteMutationBuilder implements MutationBuilder {
- public Mutation.Builder apply(Entity entity) {
- return makeDelete(entity.getKey());
- }
- }
-
- /**
- * A helper class to write entities to datastore.
- */
- static class V1Beta3TestWriter {
- private static final Logger LOG = LoggerFactory.getLogger(V1Beta3TestWriter.class);
- // Limits the number of entities updated per batch
- private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
- // Number of times to retry on update failure
- private static final int MAX_RETRIES = 5;
- //Initial backoff time for exponential backoff for retry attempts.
- private static final int INITIAL_BACKOFF_MILLIS = 5000;
-
- // Returns true if a Datastore key is complete. A key is complete if its last element
- // has either an id or a name.
- static boolean isValidKey(Key key) {
- List<PathElement> elementList = key.getPathList();
- if (elementList.isEmpty()) {
- return false;
- }
- PathElement lastElement = elementList.get(elementList.size() - 1);
- return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
- }
-
- private final Datastore datastore;
- private final MutationBuilder mutationBuilder;
- private final List<Entity> entities = new ArrayList<>();
-
- V1Beta3TestWriter(Datastore datastore, MutationBuilder mutationBuilder) {
- this.datastore = datastore;
- this.mutationBuilder = mutationBuilder;
- }
-
- void write(Entity value) throws Exception {
- // Verify that the entity to write has a complete key.
- if (!isValidKey(value.getKey())) {
- throw new IllegalArgumentException(
- "Entities to be written to the Datastore must have complete keys");
- }
-
- entities.add(value);
-
- if (entities.size() >= DATASTORE_BATCH_UPDATE_LIMIT) {
- flushBatch();
- }
- }
-
- void close() throws Exception {
- // flush any remaining entities
- if (entities.size() > 0) {
- flushBatch();
- }
- }
-
- // commit the list of entities to datastore
- private void flushBatch() throws DatastoreException, IOException, InterruptedException {
- LOG.info("Writing batch of {} entities", entities.size());
- Sleeper sleeper = Sleeper.DEFAULT;
- BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
-
- while (true) {
- // Batch mutate entities.
- try {
- CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
- for (Entity entity: entities) {
- commitRequest.addMutations(mutationBuilder.apply(entity));
- }
- commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
- datastore.commit(commitRequest.build());
- // Break if the commit threw no exception.
- break;
- } catch (DatastoreException exception) {
- LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
- exception.getMessage());
- if (!BackOffUtils.next(sleeper, backoff)) {
- LOG.error("Aborting after {} retries.", MAX_RETRIES);
- throw exception;
- }
- }
- }
- LOG.info("Successfully wrote {} entities", entities.size());
- entities.clear();
- }
- }
-
- /**
- * A helper class to read entities from datastore.
- */
- static class V1Beta3TestReader {
- private static final int QUERY_BATCH_LIMIT = 500;
- private final Datastore datastore;
- private final Query query;
- @Nullable
- private final String namespace;
- private boolean moreResults;
- private java.util.Iterator<EntityResult> entities;
- // Current batch of query results
- private QueryResultBatch currentBatch;
- private Entity currentEntity;
-
- V1Beta3TestReader(Datastore datastore, Query query, @Nullable String namespace) {
- this.datastore = datastore;
- this.query = query;
- this.namespace = namespace;
- }
-
- Entity getCurrent() {
- return currentEntity;
- }
-
- boolean advance() throws IOException {
- if (entities == null || (!entities.hasNext() && moreResults)) {
- try {
- entities = getIteratorAndMoveCursor();
- } catch (DatastoreException e) {
- throw new IOException(e);
- }
- }
-
- if (entities == null || !entities.hasNext()) {
- currentEntity = null;
- return false;
- }
-
- currentEntity = entities.next().getEntity();
- return true;
- }
-
- // Read the next batch of query results.
- private Iterator<EntityResult> getIteratorAndMoveCursor() throws DatastoreException {
- Query.Builder query = this.query.toBuilder().clone();
- query.setLimit(Int32Value.newBuilder().setValue(QUERY_BATCH_LIMIT));
- if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
- query.setStartCursor(currentBatch.getEndCursor());
- }
-
- RunQueryRequest request = makeRequest(query.build(), namespace);
- RunQueryResponse response = datastore.runQuery(request);
-
- currentBatch = response.getBatch();
-
- int numFetch = currentBatch.getEntityResultsCount();
- // All indications from the API are that there are/may be more results.
- moreResults = ((numFetch == QUERY_BATCH_LIMIT)
- || (currentBatch.getMoreResults() == NOT_FINISHED));
-
- // May receive a batch of 0 results if the number of records is a multiple
- // of the request limit.
- if (numFetch == 0) {
- return null;
- }
-
- return currentBatch.getEntityResultsList().iterator();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3WriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3WriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3WriteIT.java
deleted file mode 100644
index 782065f..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3WriteIT.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.datastore;
-
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.countEntities;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.deleteAllEntities;
-import static org.junit.Assert.assertEquals;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.CountingInput;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.CreateEntityFn;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.ParDo;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.UUID;
-
-/**
- * End-to-end tests for Datastore V1Beta3.Write.
- */
-@RunWith(JUnit4.class)
-public class V1Beta3WriteIT {
- private V1Beta3TestOptions options;
- private String ancestor;
- private final long numEntities = 1000;
-
- @Before
- public void setup() {
- PipelineOptionsFactory.register(V1Beta3TestOptions.class);
- options = TestPipeline.testingPipelineOptions().as(V1Beta3TestOptions.class);
- ancestor = UUID.randomUUID().toString();
- }
-
- /**
- * An end-to-end test for {@link V1Beta3.Write}.
- *
- * Write some test entities to datastore through a dataflow pipeline.
- * Read and count all the entities. Verify that the count matches the
- * number of entities written.
- */
- @Test
- public void testE2EV1Beta3Write() throws Exception {
- Pipeline p = Pipeline.create(options);
-
- // Write to datastore
- p.apply(CountingInput.upTo(numEntities))
- .apply(ParDo.of(new CreateEntityFn(
- options.getKind(), options.getNamespace(), ancestor)))
- .apply(DatastoreIO.v1beta3().write().withProjectId(options.getProject()));
-
- p.run();
-
- // Count number of entities written to datastore.
- long numEntitiesWritten = countEntities(options, ancestor);
-
- assertEquals(numEntitiesWritten, numEntities);
- }
-
- @After
- public void tearDown() throws Exception {
- deleteAllEntities(options, ancestor);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
new file mode 100644
index 0000000..8fedc77
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.deleteAllEntities;
+import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.getDatastore;
+import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.makeAncestorKey;
+import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.makeEntity;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.UpsertMutationBuilder;
+import org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.V1TestWriter;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.Key;
+import com.google.datastore.v1.Query;
+import com.google.datastore.v1.client.Datastore;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.UUID;
+
+/**
+ * End-to-end tests for Datastore DatastoreV1.Read.
+ */
+@RunWith(JUnit4.class)
+public class V1ReadIT {
+ private V1TestOptions options;
+ private String ancestor;
+ private final long numEntities = 1000;
+
+ @Before
+ public void setup() {
+ PipelineOptionsFactory.register(V1TestOptions.class);
+ options = TestPipeline.testingPipelineOptions().as(V1TestOptions.class);
+ ancestor = UUID.randomUUID().toString();
+ }
+
+ /**
+ * An end-to-end test for {@link DatastoreV1.Read}.
+ *
+ * Write some test entities to datastore and then run a dataflow pipeline that
+ * reads and counts the total number of entities. Verify that the count matches the
+ * number of entities written.
+ */
+ @Test
+ public void testE2EV1Read() throws Exception {
+ // Create entities and write them to datastore
+ writeEntitiesToDatastore(options, ancestor, numEntities);
+
+ // Read from datastore
+ Query query = V1TestUtil.makeAncestorKindQuery(
+ options.getKind(), options.getNamespace(), ancestor);
+
+ DatastoreV1.Read read = DatastoreIO.v1().read()
+ .withProjectId(options.getProject())
+ .withQuery(query)
+ .withNamespace(options.getNamespace());
+
+ // Count the total number of entities
+ Pipeline p = Pipeline.create(options);
+ PCollection<Long> count = p
+ .apply(read)
+ .apply(Count.<Entity>globally());
+
+ PAssert.thatSingleton(count).isEqualTo(numEntities);
+ p.run();
+ }
+
+ // Creates entities and write them to datastore
+ private static void writeEntitiesToDatastore(V1TestOptions options, String ancestor,
+ long numEntities) throws Exception {
+ Datastore datastore = getDatastore(options, options.getProject());
+ // Write test entities to datastore
+ V1TestWriter writer = new V1TestWriter(datastore, new UpsertMutationBuilder());
+ Key ancestorKey = makeAncestorKey(options.getNamespace(), options.getKind(), ancestor);
+
+ for (long i = 0; i < numEntities; i++) {
+ Entity entity = makeEntity(i, ancestorKey, options.getKind(), options.getNamespace());
+ writer.write(entity);
+ }
+ writer.close();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ deleteAllEntities(options, ancestor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestOptions.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestOptions.java
new file mode 100644
index 0000000..360855f
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestOptions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+import javax.annotation.Nullable;
+
+/**
+ * DatastoreV1 Datastore related pipeline options.
+ */
+public interface V1TestOptions extends TestPipelineOptions {
+ @Description("Project ID to read from datastore")
+ @Default.String("apache-beam-testing")
+ String getProject();
+ void setProject(String value);
+
+ @Description("Datastore Entity kind")
+ @Default.String("beam-test")
+ String getKind();
+ void setKind(String value);
+
+ @Description("Datastore Namespace")
+ String getNamespace();
+ void setNamespace(@Nullable String value);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
new file mode 100644
index 0000000..1e323ec
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static com.google.datastore.v1.QueryResultBatch.MoreResultsType.NOT_FINISHED;
+import static com.google.datastore.v1.client.DatastoreHelper.makeDelete;
+import static com.google.datastore.v1.client.DatastoreHelper.makeFilter;
+import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
+import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert;
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
+
+import com.google.api.client.auth.oauth2.Credential;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.datastore.v1.CommitRequest;
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.EntityResult;
+import com.google.datastore.v1.Key;
+import com.google.datastore.v1.Key.PathElement;
+import com.google.datastore.v1.Mutation;
+import com.google.datastore.v1.PropertyFilter;
+import com.google.datastore.v1.Query;
+import com.google.datastore.v1.QueryResultBatch;
+import com.google.datastore.v1.RunQueryRequest;
+import com.google.datastore.v1.RunQueryResponse;
+import com.google.datastore.v1.client.Datastore;
+import com.google.datastore.v1.client.DatastoreException;
+import com.google.datastore.v1.client.DatastoreFactory;
+import com.google.datastore.v1.client.DatastoreOptions;
+import com.google.protobuf.Int32Value;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import javax.annotation.Nullable;
+
+class V1TestUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(V1TestUtil.class);
+
+ /**
+ * A helper function to create the ancestor key for all created and queried entities.
+ */
+ static Key makeAncestorKey(@Nullable String namespace, String kind, String ancestor) {
+ Key.Builder keyBuilder = makeKey(kind, ancestor);
+ if (namespace != null) {
+ keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
+ }
+ return keyBuilder.build();
+ }
+
+ /**
+ * Build a datastore ancestor query for the specified kind, namespace and ancestor.
+ */
+ static Query makeAncestorKindQuery(String kind, @Nullable String namespace, String ancestor) {
+ Query.Builder q = Query.newBuilder();
+ q.addKindBuilder().setName(kind);
+ q.setFilter(makeFilter(
+ "__key__",
+ PropertyFilter.Operator.HAS_ANCESTOR,
+ makeValue(makeAncestorKey(namespace, kind, ancestor))));
+ return q.build();
+ }
+
+ /**
+ * Build an entity for the given ancestorKey, kind, namespace and value.
+ */
+ static Entity makeEntity(Long value, Key ancestorKey, String kind, @Nullable String namespace) {
+ Entity.Builder entityBuilder = Entity.newBuilder();
+ Key.Builder keyBuilder = makeKey(ancestorKey, kind, UUID.randomUUID().toString());
+ // NOTE: Namespace is not inherited between keys created with DatastoreHelper.makeKey, so
+ // we must set the namespace on keyBuilder. TODO: Once partitionId inheritance is added,
+ // we can simplify this code.
+ if (namespace != null) {
+ keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
+ }
+
+ entityBuilder.setKey(keyBuilder.build());
+ entityBuilder.getMutableProperties().put("value", makeValue(value).build());
+ return entityBuilder.build();
+ }
+
+ /**
+ * A DoFn that creates entity for a long number.
+ */
+ static class CreateEntityFn extends DoFn<Long, Entity> {
+ private final String kind;
+ @Nullable
+ private final String namespace;
+ private Key ancestorKey;
+
+ CreateEntityFn(String kind, @Nullable String namespace, String ancestor) {
+ this.kind = kind;
+ this.namespace = namespace;
+ // Build the ancestor key for all created entities once, including the namespace.
+ ancestorKey = makeAncestorKey(namespace, kind, ancestor);
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(makeEntity(c.element(), ancestorKey, kind, namespace));
+ }
+ }
+
+ /**
+ * Build a new datastore client.
+ */
+ static Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) {
+ DatastoreOptions.Builder builder =
+ new DatastoreOptions.Builder()
+ .projectId(projectId)
+ .initializer(
+ new RetryHttpRequestInitializer()
+ );
+
+ Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
+ if (credential != null) {
+ builder.credential(credential);
+ }
+ return DatastoreFactory.get().create(builder.build());
+ }
+
+ /**
+ * Build a datastore query request.
+ */
+ private static RunQueryRequest makeRequest(Query query, @Nullable String namespace) {
+ RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query);
+ if (namespace != null) {
+ requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
+ }
+ return requestBuilder.build();
+ }
+
+ /**
+ * Delete all entities with the given ancestor.
+ */
+ static void deleteAllEntities(V1TestOptions options, String ancestor) throws Exception {
+ Datastore datastore = getDatastore(options, options.getProject());
+ Query query = V1TestUtil.makeAncestorKindQuery(
+ options.getKind(), options.getNamespace(), ancestor);
+
+ V1TestReader reader = new V1TestReader(datastore, query, options.getNamespace());
+ V1TestWriter writer = new V1TestWriter(datastore, new DeleteMutationBuilder());
+
+ long numEntities = 0;
+ while (reader.advance()) {
+ Entity entity = reader.getCurrent();
+ numEntities++;
+ writer.write(entity);
+ }
+
+ writer.close();
+ LOG.info("Successfully deleted {} entities", numEntities);
+ }
+
+ /**
+ * Returns the total number of entities for the given datastore.
+ */
+ static long countEntities(V1TestOptions options, String ancestor) throws Exception {
+ // Read from datastore.
+ Datastore datastore = V1TestUtil.getDatastore(options, options.getProject());
+ Query query = V1TestUtil.makeAncestorKindQuery(
+ options.getKind(), options.getNamespace(), ancestor);
+
+ V1TestReader reader = new V1TestReader(datastore, query, options.getNamespace());
+
+ long numEntitiesRead = 0;
+ while (reader.advance()) {
+ reader.getCurrent();
+ numEntitiesRead++;
+ }
+ return numEntitiesRead;
+ }
+
+ /**
+ * An interface to represent any datastore mutation operation.
+ * Mutation operations include insert, delete, upsert, update.
+ */
+ interface MutationBuilder {
+ Mutation.Builder apply(Entity entity);
+ }
+
+ /**
+ *A MutationBuilder that performs upsert operation.
+ */
+ static class UpsertMutationBuilder implements MutationBuilder {
+ public Mutation.Builder apply(Entity entity) {
+ return makeUpsert(entity);
+ }
+ }
+
+ /**
+ * A MutationBuilder that performs delete operation.
+ */
+ static class DeleteMutationBuilder implements MutationBuilder {
+ public Mutation.Builder apply(Entity entity) {
+ return makeDelete(entity.getKey());
+ }
+ }
+
+ /**
+ * A helper class to write entities to datastore.
+ */
+ static class V1TestWriter {
+ private static final Logger LOG = LoggerFactory.getLogger(V1TestWriter.class);
+ // Limits the number of entities updated per batch
+ private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
+ // Number of times to retry on update failure
+ private static final int MAX_RETRIES = 5;
+ //Initial backoff time for exponential backoff for retry attempts.
+ private static final int INITIAL_BACKOFF_MILLIS = 5000;
+
+ // Returns true if a Datastore key is complete. A key is complete if its last element
+ // has either an id or a name.
+ static boolean isValidKey(Key key) {
+ List<PathElement> elementList = key.getPathList();
+ if (elementList.isEmpty()) {
+ return false;
+ }
+ PathElement lastElement = elementList.get(elementList.size() - 1);
+ return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
+ }
+
+ private final Datastore datastore;
+ private final MutationBuilder mutationBuilder;
+ private final List<Entity> entities = new ArrayList<>();
+
+ V1TestWriter(Datastore datastore, MutationBuilder mutationBuilder) {
+ this.datastore = datastore;
+ this.mutationBuilder = mutationBuilder;
+ }
+
+ void write(Entity value) throws Exception {
+ // Verify that the entity to write has a complete key.
+ if (!isValidKey(value.getKey())) {
+ throw new IllegalArgumentException(
+ "Entities to be written to the Datastore must have complete keys");
+ }
+
+ entities.add(value);
+
+ if (entities.size() >= DATASTORE_BATCH_UPDATE_LIMIT) {
+ flushBatch();
+ }
+ }
+
+ void close() throws Exception {
+ // flush any remaining entities
+ if (entities.size() > 0) {
+ flushBatch();
+ }
+ }
+
+ // commit the list of entities to datastore
+ private void flushBatch() throws DatastoreException, IOException, InterruptedException {
+ LOG.info("Writing batch of {} entities", entities.size());
+ Sleeper sleeper = Sleeper.DEFAULT;
+ BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
+
+ while (true) {
+ // Batch mutate entities.
+ try {
+ CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+ for (Entity entity: entities) {
+ commitRequest.addMutations(mutationBuilder.apply(entity));
+ }
+ commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+ datastore.commit(commitRequest.build());
+ // Break if the commit threw no exception.
+ break;
+ } catch (DatastoreException exception) {
+ LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
+ exception.getMessage());
+ if (!BackOffUtils.next(sleeper, backoff)) {
+ LOG.error("Aborting after {} retries.", MAX_RETRIES);
+ throw exception;
+ }
+ }
+ }
+ LOG.info("Successfully wrote {} entities", entities.size());
+ entities.clear();
+ }
+ }
+
+ /**
+ * A helper class to read entities from datastore.
+ */
+ static class V1TestReader {
+ private static final int QUERY_BATCH_LIMIT = 500;
+ private final Datastore datastore;
+ private final Query query;
+ @Nullable
+ private final String namespace;
+ private boolean moreResults;
+ private java.util.Iterator<EntityResult> entities;
+ // Current batch of query results
+ private QueryResultBatch currentBatch;
+ private Entity currentEntity;
+
+ V1TestReader(Datastore datastore, Query query, @Nullable String namespace) {
+ this.datastore = datastore;
+ this.query = query;
+ this.namespace = namespace;
+ }
+
+ Entity getCurrent() {
+ return currentEntity;
+ }
+
+ boolean advance() throws IOException {
+ if (entities == null || (!entities.hasNext() && moreResults)) {
+ try {
+ entities = getIteratorAndMoveCursor();
+ } catch (DatastoreException e) {
+ throw new IOException(e);
+ }
+ }
+
+ if (entities == null || !entities.hasNext()) {
+ currentEntity = null;
+ return false;
+ }
+
+ currentEntity = entities.next().getEntity();
+ return true;
+ }
+
+ // Read the next batch of query results.
+ private Iterator<EntityResult> getIteratorAndMoveCursor() throws DatastoreException {
+ Query.Builder query = this.query.toBuilder().clone();
+ query.setLimit(Int32Value.newBuilder().setValue(QUERY_BATCH_LIMIT));
+ if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
+ query.setStartCursor(currentBatch.getEndCursor());
+ }
+
+ RunQueryRequest request = makeRequest(query.build(), namespace);
+ RunQueryResponse response = datastore.runQuery(request);
+
+ currentBatch = response.getBatch();
+
+ int numFetch = currentBatch.getEntityResultsCount();
+ // All indications from the API are that there are/may be more results.
+ moreResults = ((numFetch == QUERY_BATCH_LIMIT)
+ || (currentBatch.getMoreResults() == NOT_FINISHED));
+
+ // May receive a batch of 0 results if the number of records is a multiple
+ // of the request limit.
+ if (numFetch == 0) {
+ return null;
+ }
+
+ return currentBatch.getEntityResultsList().iterator();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
new file mode 100644
index 0000000..b97c05c
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.countEntities;
+import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.deleteAllEntities;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.CreateEntityFn;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.ParDo;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.UUID;
+
+/**
+ * End-to-end tests for Datastore DatastoreV1.Write.
+ */
+@RunWith(JUnit4.class)
+public class V1WriteIT {
+ private V1TestOptions options;
+ private String ancestor;
+ private final long numEntities = 1000;
+
+ @Before
+ public void setup() {
+ PipelineOptionsFactory.register(V1TestOptions.class);
+ options = TestPipeline.testingPipelineOptions().as(V1TestOptions.class);
+ ancestor = UUID.randomUUID().toString();
+ }
+
+ /**
+ * An end-to-end test for {@link DatastoreV1.Write}.
+ *
+ * Write some test entities to datastore through a dataflow pipeline.
+ * Read and count all the entities. Verify that the count matches the
+ * number of entities written.
+ */
+ @Test
+ public void testE2EV1Write() throws Exception {
+ Pipeline p = Pipeline.create(options);
+
+ // Write to datastore
+ p.apply(CountingInput.upTo(numEntities))
+ .apply(ParDo.of(new CreateEntityFn(
+ options.getKind(), options.getNamespace(), ancestor)))
+ .apply(DatastoreIO.v1().write().withProjectId(options.getProject()));
+
+ p.run();
+
+ // Count number of entities written to datastore.
+ long numEntitiesWritten = countEntities(options, ancestor);
+
+ assertEquals(numEntitiesWritten, numEntities);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ deleteAllEntities(options, ancestor);
+ }
+}