You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/18 00:45:35 UTC
[2/2] incubator-beam git commit: DatastoreIO Sink as ParDo
DatastoreIO Sink as ParDo
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a0361ae9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a0361ae9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a0361ae9
Branch: refs/heads/master
Commit: a0361ae99e9e39bb5ff9766508501932416129ec
Parents: a07648b
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Mon Aug 15 15:28:07 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Aug 17 17:45:05 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/datastore/V1Beta3.java | 376 +++++++------------
.../beam/sdk/io/gcp/datastore/V1Beta3Test.java | 88 +++--
2 files changed, 195 insertions(+), 269 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0361ae9/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
index 052feb3..0d2e2cb 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
@@ -30,10 +30,6 @@ import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert;
import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.io.Sink.WriteOperation;
-import org.apache.beam.sdk.io.Sink.Writer;
import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Create;
@@ -167,7 +163,8 @@ public class V1Beta3 {
* Datastore has a limit of 500 mutations per batch operation, so we flush
* changes to Datastore every 500 entities.
*/
- private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
+ @VisibleForTesting
+ static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
/**
* Returns an empty {@link V1Beta3.Read} builder. Configure the source {@code projectId},
@@ -634,42 +631,8 @@ public class V1Beta3 {
}
}
}
-
- /**
- * A wrapper factory class for Datastore singleton classes {@link DatastoreFactory} and
- * {@link QuerySplitter}
- *
- * <p>{@link DatastoreFactory} and {@link QuerySplitter} are not java serializable, hence
- * wrapping them under this class, which implements {@link Serializable}.
- */
- @VisibleForTesting
- static class V1Beta3DatastoreFactory implements Serializable {
-
- /** Builds a Datastore client for the given pipeline options and project. */
- public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) {
- DatastoreOptions.Builder builder =
- new DatastoreOptions.Builder()
- .projectId(projectId)
- .initializer(
- new RetryHttpRequestInitializer()
- );
-
- Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
- if (credential != null) {
- builder.credential(credential);
- }
-
- return DatastoreFactory.get().create(builder.build());
- }
-
- /** Builds a Datastore {@link QuerySplitter}. */
- public QuerySplitter getQuerySplitter() {
- return DatastoreHelper.getQuerySplitter();
- }
- }
}
-
/**
* Returns an empty {@link V1Beta3.Write} builder. Configure the destination
* {@code projectId} using {@link V1Beta3.Write#withProjectId}.
@@ -705,8 +668,8 @@ public class V1Beta3 {
@Override
public PDone apply(PCollection<Entity> input) {
- return input.apply(
- org.apache.beam.sdk.io.Write.to(new DatastoreSink(projectId)));
+ input.apply(ParDo.of(new DatastoreWriterFn(projectId)));
+ return PDone.in(input.getPipeline());
}
@Override
@@ -733,130 +696,127 @@ public class V1Beta3 {
.addIfNotNull(DisplayData.item("projectId", projectId)
.withLabel("Output Project"));
}
- }
- /**
- * A {@link org.apache.beam.sdk.io.Sink} that writes data to Datastore.
- */
- static class DatastoreSink extends org.apache.beam.sdk.io.Sink<Entity> {
- final String projectId;
-
- public DatastoreSink(String projectId) {
- this.projectId = projectId;
- }
-
- @Override
- public void validate(PipelineOptions options) {
- checkNotNull(projectId, "projectId");
- }
-
- @Override
- public DatastoreWriteOperation createWriteOperation(PipelineOptions options) {
- return new DatastoreWriteOperation(this);
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder
- .addIfNotNull(DisplayData.item("projectId", projectId)
- .withLabel("Output Project"));
- }
- }
+ /**
+ * A {@link DoFn} that writes {@link Entity} objects to Cloud Datastore. Entities are written in
+ * batches, where the maximum batch size is {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}.
+ * Entities are committed as upsert mutations (either update if the key already exists, or
+ * insert if it is a new key). If an entity does not have a complete key (i.e., it has no name
+ * or id), the bundle will fail.
+ *
+ * <p>See <a
+ * href="https://cloud.google.com/datastore/docs/concepts/entities">
+ * Datastore: Entities, Properties, and Keys</a> for information about entity keys and entities.
+ *
+ * <p>Commits are non-transactional. If a commit fails because of a conflict over an entity
+ * group, the commit will be retried (up to {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}
+ * times).
+ */
+ @VisibleForTesting
+ static class DatastoreWriterFn extends DoFn<Entity, Void> {
+ private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriterFn.class);
+ private final String projectId;
+ private transient Datastore datastore;
+ private final V1Beta3DatastoreFactory datastoreFactory;
+ // Current batch of entities to be written.
+ private final List<Entity> entities = new ArrayList<>();
+ /**
+ * Since a bundle is written in batches, we should retry the commit of a batch in order to
+ * prevent transient errors from causing the bundle to fail.
+ */
+ private static final int MAX_RETRIES = 5;
- /**
- * A {@link WriteOperation} that will manage a parallel write to a Datastore sink.
- */
- private static class DatastoreWriteOperation
- extends WriteOperation<Entity, DatastoreWriteResult> {
- private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriteOperation.class);
+ /**
+ * Initial backoff time for exponential backoff for retry attempts.
+ */
+ private static final int INITIAL_BACKOFF_MILLIS = 5000;
- private final DatastoreSink sink;
+ public DatastoreWriterFn(String projectId) {
+ this(projectId, new V1Beta3DatastoreFactory());
+ }
- public DatastoreWriteOperation(DatastoreSink sink) {
- this.sink = sink;
- }
+ @VisibleForTesting
+ DatastoreWriterFn(String projectId, V1Beta3DatastoreFactory datastoreFactory) {
+ this.projectId = checkNotNull(projectId, "projectId");
+ this.datastoreFactory = datastoreFactory;
+ }
- @Override
- public Coder<DatastoreWriteResult> getWriterResultCoder() {
- return SerializableCoder.of(DatastoreWriteResult.class);
- }
+ @StartBundle
+ public void startBundle(Context c) {
+ datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId);
+ }
- @Override
- public void initialize(PipelineOptions options) throws Exception {}
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ // Verify that the entity to write has a complete key.
+ if (!isValidKey(c.element().getKey())) {
+ throw new IllegalArgumentException(
+ "Entities to be written to the Datastore must have complete keys");
+ }
- /**
- * Finalizes the write. Logs the number of entities written to the Datastore.
- */
- @Override
- public void finalize(Iterable<DatastoreWriteResult> writerResults, PipelineOptions options)
- throws Exception {
- long totalEntities = 0;
- for (DatastoreWriteResult result : writerResults) {
- totalEntities += result.entitiesWritten;
+ entities.add(c.element());
+ if (entities.size() >= V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT) {
+ flushBatch();
+ }
}
- LOG.info("Wrote {} elements.", totalEntities);
- }
- @Override
- public DatastoreWriter createWriter(PipelineOptions options) throws Exception {
- DatastoreOptions.Builder builder =
- new DatastoreOptions.Builder()
- .projectId(sink.projectId)
- .initializer(new RetryHttpRequestInitializer());
- Credential credential = options.as(GcpOptions.class).getGcpCredential();
- if (credential != null) {
- builder.credential(credential);
+ @FinishBundle
+ public void finishBundle(Context c) throws Exception {
+ if (entities.size() > 0) {
+ flushBatch();
+ }
}
- Datastore datastore = DatastoreFactory.get().create(builder.build());
- return new DatastoreWriter(this, datastore);
- }
+ /**
+ * Writes a batch of entities to Cloud Datastore.
+ *
+ * <p>If a commit fails, it will be retried (up to {@link DatastoreWriterFn#MAX_RETRIES}
+ * times). All entities in the batch will be committed again, even if the commit was partially
+ * successful. If the retry limit is exceeded, the last exception from the Datastore will be
+ * thrown.
+ *
+ * @throws DatastoreException if the commit fails or IOException or InterruptedException if
+ * backing off between retries fails.
+ */
+ private void flushBatch() throws DatastoreException, IOException, InterruptedException {
+ LOG.debug("Writing batch of {} entities", entities.size());
+ Sleeper sleeper = Sleeper.DEFAULT;
+ BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
+
+ while (true) {
+ // Batch upsert entities.
+ try {
+ CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+ for (Entity entity: entities) {
+ commitRequest.addMutations(makeUpsert(entity));
+ }
+ commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+ datastore.commit(commitRequest.build());
+ // Break if the commit threw no exception.
+ break;
+ } catch (DatastoreException exception) {
+ // Only log the code and message for potentially-transient errors. The entire exception
+ // will be propagated upon the last retry.
+ LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
+ exception.getMessage());
+ if (!BackOffUtils.next(sleeper, backoff)) {
+ LOG.error("Aborting after {} retries.", MAX_RETRIES);
+ throw exception;
+ }
+ }
+ }
+ LOG.debug("Successfully wrote {} entities", entities.size());
+ entities.clear();
+ }
- @Override
- public DatastoreSink getSink() {
- return sink;
+ @Override
+ public void populateDisplayData(Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .addIfNotNull(DisplayData.item("projectId", projectId)
+ .withLabel("Output Project"));
+ }
}
- }
-
- /**
- * {@link Writer} that writes entities to a Datastore Sink. Entities are written in batches,
- * where the maximum batch size is {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}. Entities
- * are committed as upsert mutations (either update if the key already exists, or insert if it is
- * a new key). If an entity does not have a complete key (i.e., it has no name or id), the bundle
- * will fail.
- *
- * <p>See <a
- * href="https://cloud.google.com/datastore/docs/concepts/entities#Datastore_Creating_an_entity">
- * Datastore: Entities, Properties, and Keys</a> for information about entity keys and upsert
- * mutations.
- *
- * <p>Commits are non-transactional. If a commit fails because of a conflict over an entity
- * group, the commit will be retried (up to {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}
- * times).
- *
- * <p>Visible for testing purposes.
- */
- @VisibleForTesting
- static class DatastoreWriter extends Writer<Entity, DatastoreWriteResult> {
- private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriter.class);
- private final DatastoreWriteOperation writeOp;
- private final Datastore datastore;
- private long totalWritten = 0;
-
- // Visible for testing.
- final List<Entity> entities = new ArrayList<>();
-
- /**
- * Since a bundle is written in batches, we should retry the commit of a batch in order to
- * prevent transient errors from causing the bundle to fail.
- */
- private static final int MAX_RETRIES = 5;
-
- /**
- * Initial backoff time for exponential backoff for retry attempts.
- */
- private static final int INITIAL_BACKOFF_MILLIS = 5000;
/**
* Returns true if a Datastore key is complete. A key is complete if its last element
@@ -870,100 +830,38 @@ public class V1Beta3 {
PathElement lastElement = elementList.get(elementList.size() - 1);
return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
}
+ }
- DatastoreWriter(DatastoreWriteOperation writeOp, Datastore datastore) {
- this.writeOp = writeOp;
- this.datastore = datastore;
- }
-
- @Override
- public void open(String uId) throws Exception {}
-
- /**
- * Writes an entity to the Datastore. Writes are batched, up to {@link
- * V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}. If an entity does not have a complete key, an
- * {@link IllegalArgumentException} will be thrown.
- */
- @Override
- public void write(Entity value) throws Exception {
- // Verify that the entity to write has a complete key.
- if (!isValidKey(value.getKey())) {
- throw new IllegalArgumentException(
- "Entities to be written to the Datastore must have complete keys");
- }
-
- entities.add(value);
+ /**
+ * A wrapper factory class for Datastore singleton classes {@link DatastoreFactory} and
+ * {@link QuerySplitter}
+ *
+ * <p>{@link DatastoreFactory} and {@link QuerySplitter} are not java serializable, hence
+ * wrapping them under this class, which implements {@link Serializable}.
+ */
+ @VisibleForTesting
+ static class V1Beta3DatastoreFactory implements Serializable {
- if (entities.size() >= V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT) {
- flushBatch();
- }
- }
+ /** Builds a Datastore client for the given pipeline options and project. */
+ public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) {
+ DatastoreOptions.Builder builder =
+ new DatastoreOptions.Builder()
+ .projectId(projectId)
+ .initializer(
+ new RetryHttpRequestInitializer()
+ );
- /**
- * Flushes any pending batch writes and returns a DatastoreWriteResult.
- */
- @Override
- public DatastoreWriteResult close() throws Exception {
- if (entities.size() > 0) {
- flushBatch();
+ Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
+ if (credential != null) {
+ builder.credential(credential);
}
- return new DatastoreWriteResult(totalWritten);
- }
- @Override
- public DatastoreWriteOperation getWriteOperation() {
- return writeOp;
+ return DatastoreFactory.get().create(builder.build());
}
- /**
- * Writes a batch of entities to the Datastore.
- *
- * <p>If a commit fails, it will be retried (up to {@link DatastoreWriter#MAX_RETRIES}
- * times). All entities in the batch will be committed again, even if the commit was partially
- * successful. If the retry limit is exceeded, the last exception from the Datastore will be
- * thrown.
- *
- * @throws DatastoreException if the commit fails or IOException or InterruptedException if
- * backing off between retries fails.
- */
- private void flushBatch() throws DatastoreException, IOException, InterruptedException {
- LOG.debug("Writing batch of {} entities", entities.size());
- Sleeper sleeper = Sleeper.DEFAULT;
- BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
-
- while (true) {
- // Batch upsert entities.
- try {
- CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
- for (Entity entity: entities) {
- commitRequest.addMutations(makeUpsert(entity));
- }
- commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
- datastore.commit(commitRequest.build());
- // Break if the commit threw no exception.
- break;
- } catch (DatastoreException exception) {
- // Only log the code and message for potentially-transient errors. The entire exception
- // will be propagated upon the last retry.
- LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
- exception.getMessage());
- if (!BackOffUtils.next(sleeper, backoff)) {
- LOG.error("Aborting after {} retries.", MAX_RETRIES);
- throw exception;
- }
- }
- }
- totalWritten += entities.size();
- LOG.debug("Successfully wrote {} entities", entities.size());
- entities.clear();
- }
- }
-
- private static class DatastoreWriteResult implements Serializable {
- final long entitiesWritten;
-
- public DatastoreWriteResult(long recordsWritten) {
- this.entitiesWritten = recordsWritten;
+ /** Builds a Datastore {@link QuerySplitter}. */
+ public QuerySplitter getQuerySplitter() {
+ return DatastoreHelper.getQuerySplitter();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0361ae9/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
index 9947c60..8fa34da 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.gcp.datastore;
+import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT;
import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.DEFAULT_BUNDLE_SIZE_BYTES;
import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.QUERY_BATCH_LIMIT;
import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.getEstimatedSizeBytes;
@@ -27,8 +28,8 @@ import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING;
import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert;
import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
-import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -44,11 +45,12 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DatastoreWriter;
import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.ReadFn;
import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.SplitQueryFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.V1Beta3DatastoreFactory;
import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.V1Beta3Options;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.V1Beta3DatastoreFactory;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Write;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Write.DatastoreWriterFn;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.transforms.DoFnTester;
@@ -61,7 +63,7 @@ import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
-import com.google.common.collect.Lists;
+import com.google.datastore.v1beta3.CommitRequest;
import com.google.datastore.v1beta3.Entity;
import com.google.datastore.v1beta3.EntityResult;
import com.google.datastore.v1beta3.Key;
@@ -87,7 +89,6 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -285,33 +286,33 @@ public class V1Beta3Test {
Key key;
// Complete with name, no ancestor
key = makeKey("bird", "finch").build();
- assertTrue(DatastoreWriter.isValidKey(key));
+ assertTrue(Write.isValidKey(key));
// Complete with id, no ancestor
key = makeKey("bird", 123).build();
- assertTrue(DatastoreWriter.isValidKey(key));
+ assertTrue(Write.isValidKey(key));
// Incomplete, no ancestor
key = makeKey("bird").build();
- assertFalse(DatastoreWriter.isValidKey(key));
+ assertFalse(Write.isValidKey(key));
// Complete with name and ancestor
key = makeKey("bird", "owl").build();
key = makeKey(key, "bird", "horned").build();
- assertTrue(DatastoreWriter.isValidKey(key));
+ assertTrue(Write.isValidKey(key));
// Complete with id and ancestor
key = makeKey("bird", "owl").build();
key = makeKey(key, "bird", 123).build();
- assertTrue(DatastoreWriter.isValidKey(key));
+ assertTrue(Write.isValidKey(key));
// Incomplete with ancestor
key = makeKey("bird", "owl").build();
key = makeKey(key, "bird").build();
- assertFalse(DatastoreWriter.isValidKey(key));
+ assertFalse(Write.isValidKey(key));
key = makeKey().build();
- assertFalse(DatastoreWriter.isValidKey(key));
+ assertFalse(Write.isValidKey(key));
}
/**
@@ -321,35 +322,62 @@ public class V1Beta3Test {
public void testAddEntitiesWithIncompleteKeys() throws Exception {
Key key = makeKey("bird").build();
Entity entity = Entity.newBuilder().setKey(key).build();
- DatastoreWriter writer = new DatastoreWriter(null, mockDatastore);
+ DatastoreWriterFn writer = new DatastoreWriterFn(PROJECT_ID, mockDatastoreFactory);
+ DoFnTester<Entity, Void> doFnTester = DoFnTester.of(writer);
+ doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Entities to be written to the Datastore must have complete keys");
- writer.write(entity);
+ doFnTester.processBundle(entity);
+ }
+
+ /** Tests {@link DatastoreWriterFn} with entities less than one batch. */
+ @Test
+ public void testDatatoreWriterFnWithOneBatch() throws Exception {
+ datastoreWriterFnTest(100);
+ }
+
+ /** Tests {@link DatastoreWriterFn} with entities of more than one batches, but not a multiple. */
+ @Test
+ public void testDatatoreWriterFnWithMultipleBatches() throws Exception {
+ datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 3 + 100);
}
/**
- * Test that entities are added to the batch to update.
+ * Tests {@link DatastoreWriterFn} with entities of several batches, using an exact multiple of
+ * write batch size.
*/
@Test
- public void testAddingEntities() throws Exception {
- List<Entity> expected = Lists.newArrayList(
- Entity.newBuilder().setKey(makeKey("bird", "jay").build()).build(),
- Entity.newBuilder().setKey(makeKey("bird", "condor").build()).build(),
- Entity.newBuilder().setKey(makeKey("bird", "robin").build()).build());
-
- List<Entity> allEntities = Lists.newArrayList(expected);
- Collections.shuffle(allEntities);
-
- DatastoreWriter writer = new DatastoreWriter(null, mockDatastore);
- writer.open("test_id");
- for (Entity entity : allEntities) {
- writer.write(entity);
+ public void testDatatoreWriterFnWithBatchesExactMultiple() throws Exception {
+ datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 2);
+ }
+
+ // A helper method to test DatastoreWriterFn for various batch sizes.
+ private void datastoreWriterFnTest(int numEntities) throws Exception {
+ // Create the requested number of mutations.
+ List<Entity> entities = new ArrayList<>(numEntities);
+ for (int i = 0; i < numEntities; ++i) {
+ entities.add(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build());
}
- assertEquals(expected.size(), writer.entities.size());
- assertThat(writer.entities, containsInAnyOrder(expected.toArray()));
+ DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, mockDatastoreFactory);
+ DoFnTester<Entity, Void> doFnTester = DoFnTester.of(datastoreWriter);
+ doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+ doFnTester.processBundle(entities);
+
+ int start = 0;
+ while (start < numEntities) {
+ int end = Math.min(numEntities, start + DATASTORE_BATCH_UPDATE_LIMIT);
+ CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+ commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+ for (Entity entity: entities.subList(start, end)) {
+ commitRequest.addMutations(makeUpsert(entity));
+ }
+ // Verify all the batch requests were made with the expected entities.
+ verify(mockDatastore, times(1)).commit(commitRequest.build());
+ start = end;
+ }
}
/**