You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/19 22:42:37 UTC
[1/2] incubator-beam git commit: Datastore Sink support for writing
Mutations
Repository: incubator-beam
Updated Branches:
refs/heads/master 6645dcd4a -> 78fda4513
Datastore Sink support for writing Mutations
This generalizes Write to Write and Delete cleanly.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/95330658
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/95330658
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/95330658
Branch: refs/heads/master
Commit: 953306584073044c41bcfdc4ea5e14870ddea5e4
Parents: 6645dcd
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Wed Aug 17 18:19:52 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 19 15:41:53 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/datastore/V1Beta3.java | 436 +++++++++++++------
.../beam/sdk/io/gcp/datastore/V1Beta3Test.java | 237 ++++++++--
2 files changed, 508 insertions(+), 165 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/95330658/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
index 0d2e2cb..8503b66 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
@@ -24,6 +24,7 @@ import static com.google.common.base.Verify.verify;
import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL;
import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING;
import static com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NOT_FINISHED;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeDelete;
import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder;
import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert;
@@ -36,8 +37,10 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
@@ -60,6 +63,7 @@ import com.google.datastore.v1beta3.Entity;
import com.google.datastore.v1beta3.EntityResult;
import com.google.datastore.v1beta3.Key;
import com.google.datastore.v1beta3.Key.PathElement;
+import com.google.datastore.v1beta3.Mutation;
import com.google.datastore.v1beta3.PartitionId;
import com.google.datastore.v1beta3.Query;
import com.google.datastore.v1beta3.QueryResultBatch;
@@ -84,7 +88,7 @@ import java.util.NoSuchElementException;
import javax.annotation.Nullable;
/**
- * <p>{@link V1Beta3} provides an API to Read and Write {@link PCollection PCollections} of
+ * <p>{@link V1Beta3} provides an API to Read, Write and Delete {@link PCollection PCollections} of
* <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> version v1beta3
* {@link Entity} objects.
*
@@ -129,7 +133,25 @@ import javax.annotation.Nullable;
* p.run();
* } </pre>
*
- * <p>{@link Entity Entities} in the {@code PCollection} to be written must have complete
+ * <p>To delete a {@link PCollection} of {@link Entity Entities} from Datastore, use
+ * {@link V1Beta3#deleteEntity()}, specifying the Cloud Datastore project to write to:
+ *
+ * <pre> {@code
+ * PCollection<Entity> entities = ...;
+ * entities.apply(DatastoreIO.v1beta3().deleteEntity().withProjectId(projectId));
+ * p.run();
+ * } </pre>
+ *
+ * <p>To delete entities associated with a {@link PCollection} of {@link Key Keys} from Datastore,
+ * use {@link V1Beta3#deleteKey}, specifying the Cloud Datastore project to write to:
+ *
+ * <pre> {@code
+ * PCollection<Entity> entities = ...;
+ * entities.apply(DatastoreIO.v1beta3().deleteKey().withProjectId(projectId));
+ * p.run();
+ * } </pre>
+ *
+ * <p>{@link Entity Entities} in the {@code PCollection} to be written or deleted must have complete
* {@link Key Keys}. Complete {@code Keys} specify the {@code name} and {@code id} of the
* {@code Entity}, where incomplete {@code Keys} do not. A {@code namespace} other than
* {@code projectId} default may be used by specifying it in the {@code Entity} {@code Keys}.
@@ -139,9 +161,9 @@ import javax.annotation.Nullable;
* keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
* }</pre>
*
- * <p>{@code Entities} will be committed as upsert (update or insert) mutations. Please read
- * <a href="https://cloud.google.com/datastore/docs/concepts/entities">Entities, Properties, and
- * Keys</a> for more information about {@code Entity} keys.
+ * <p>{@code Entities} will be committed as upsert (update or insert) or delete mutations. Please
+ * read <a href="https://cloud.google.com/datastore/docs/concepts/entities">Entities, Properties,
+ * and Keys</a> for more information about {@code Entity} keys.
*
* <p><h3>Permissions</h3>
* Permission requirements depend on the {@code PipelineRunner} that is used to execute the
@@ -642,20 +664,33 @@ public class V1Beta3 {
}
/**
+ * Returns an empty {@link DeleteEntity} builder. Configure the destination
+ * {@code projectId} using {@link DeleteEntity#withProjectId}.
+ */
+ public DeleteEntity deleteEntity() {
+ return new DeleteEntity(null);
+ }
+
+ /**
+ * Returns an empty {@link DeleteKey} builder. Configure the destination
+ * {@code projectId} using {@link DeleteKey#withProjectId}.
+ */
+ public DeleteKey deleteKey() {
+ return new DeleteKey(null);
+ }
+
+ /**
* A {@link PTransform} that writes {@link Entity} objects to Cloud Datastore.
*
* @see DatastoreIO
*/
- public static class Write extends PTransform<PCollection<Entity>, PDone> {
- @Nullable
- private final String projectId;
-
+ public static class Write extends Mutate<Entity> {
/**
* Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
* it is {@code null} at instantiation time, an error will be thrown.
*/
- public Write(@Nullable String projectId) {
- this.projectId = projectId;
+ Write(@Nullable String projectId) {
+ super(projectId, new UpsertFn());
}
/**
@@ -665,27 +700,99 @@ public class V1Beta3 {
checkNotNull(projectId, "projectId");
return new Write(projectId);
}
+ }
- @Override
- public PDone apply(PCollection<Entity> input) {
- input.apply(ParDo.of(new DatastoreWriterFn(projectId)));
- return PDone.in(input.getPipeline());
+ /**
+ * A {@link PTransform} that deletes {@link Entity Entities} from Cloud Datastore.
+ *
+ * @see DatastoreIO
+ */
+ public static class DeleteEntity extends Mutate<Entity> {
+ /**
+ * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
+ * it is {@code null} at instantiation time, an error will be thrown.
+ */
+ DeleteEntity(@Nullable String projectId) {
+ super(projectId, new DeleteEntityFn());
}
- @Override
- public void validate(PCollection<Entity> input) {
+ /**
+ * Returns a new {@link DeleteEntity} that deletes entities from the Cloud Datastore for the
+ * specified project.
+ */
+ public DeleteEntity withProjectId(String projectId) {
+ checkNotNull(projectId, "projectId");
+ return new DeleteEntity(projectId);
+ }
+ }
+
+ /**
+ * A {@link PTransform} that deletes {@link Entity Entities} associated with the given
+ * {@link Key Keys} from Cloud Datastore.
+ *
+ * @see DatastoreIO
+ */
+ public static class DeleteKey extends Mutate<Key> {
+ /**
+ * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
+ * it is {@code null} at instantiation time, an error will be thrown.
+ */
+ DeleteKey(@Nullable String projectId) {
+ super(projectId, new DeleteKeyFn());
+ }
+
+ /**
+ * Returns a new {@link DeleteKey} that deletes entities from the Cloud Datastore for the
+ * specified project.
+ */
+ public DeleteKey withProjectId(String projectId) {
checkNotNull(projectId, "projectId");
+ return new DeleteKey(projectId);
}
+ }
+ /**
+ * A {@link PTransform} that writes mutations to Cloud Datastore.
+ *
+ * <p>It requires a {@link DoFn} that tranforms an object of type {@code T} to a {@link Mutation}.
+ * {@code T} is usually either an {@link Entity} or a {@link Key}
+ * <b>Note:</b> Only idempotent Cloud Datastore mutation operations (upsert and delete) should
+ * be used by the {@code DoFn} provided, as the commits are retried when failures occur.
+ */
+ private abstract static class Mutate<T> extends PTransform<PCollection<T>, PDone> {
@Nullable
- public String getProjectId() {
- return projectId;
+ private final String projectId;
+ /** A function that transforms each {@code T} into a mutation. */
+ private final SimpleFunction<T, Mutation> mutationFn;
+
+ /**
+ * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
+ * it is {@code null} at instantiation time, an error will be thrown.
+ */
+ Mutate(@Nullable String projectId, SimpleFunction<T, Mutation> mutationFn) {
+ this.projectId = projectId;
+ this.mutationFn = checkNotNull(mutationFn);
+ }
+
+ @Override
+ public PDone apply(PCollection<T> input) {
+ input.apply("Convert to Mutation", MapElements.via(mutationFn))
+ .apply("Write Mutation to Datastore", ParDo.of(new DatastoreWriterFn(projectId)));
+
+ return PDone.in(input.getPipeline());
+ }
+
+ @Override
+ public void validate(PCollection<T> input) {
+ checkNotNull(projectId, "projectId");
+ checkNotNull(mutationFn, "mutationFn");
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("projectId", projectId)
+ .add("mutationFn", mutationFn.getClass().getName())
.toString();
}
@@ -694,141 +801,200 @@ public class V1Beta3 {
super.populateDisplayData(builder);
builder
.addIfNotNull(DisplayData.item("projectId", projectId)
- .withLabel("Output Project"));
+ .withLabel("Output Project"))
+ .include(mutationFn);
}
+ public String getProjectId() {
+ return projectId;
+ }
+ }
+
+ /**
+ * {@link DoFn} that writes {@link Mutation}s to Cloud Datastore. Mutations are written in
+ * batches, where the maximum batch size is {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}.
+ *
+ * <p>See <a
+ * href="https://cloud.google.com/datastore/docs/concepts/entities">
+ * Datastore: Entities, Properties, and Keys</a> for information about entity keys and mutations.
+ *
+ * <p>Commits are non-transactional. If a commit fails because of a conflict over an entity
+ * group, the commit will be retried (up to {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}
+ * times). This means that the mutation operation should be idempotent. Thus, the writer should
+ * only be used for {code upsert} and {@code delete} mutation operations, as these are the only
+ * two Cloud Datastore mutations that are idempotent.
+ */
+ @VisibleForTesting
+ static class DatastoreWriterFn extends DoFn<Mutation, Void> {
+ private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriterFn.class);
+ private final String projectId;
+ private transient Datastore datastore;
+ private final V1Beta3DatastoreFactory datastoreFactory;
+ // Current batch of mutations to be written.
+ private final List<Mutation> mutations = new ArrayList<>();
+ /**
+ * Since a bundle is written in batches, we should retry the commit of a batch in order to
+ * prevent transient errors from causing the bundle to fail.
+ */
+ private static final int MAX_RETRIES = 5;
+
/**
- * A {@link DoFn} that writes {@link Entity} objects to Cloud Datastore. Entities are written in
- * batches, where the maximum batch size is {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}.
- * Entities are committed as upsert mutations (either update if the key already exists, or
- * insert if it is a new key). If an entity does not have a complete key (i.e., it has no name
- * or id), the bundle will fail.
- *
- * <p>See <a
- * href="https://cloud.google.com/datastore/docs/concepts/entities">
- * Datastore: Entities, Properties, and Keys</a> for information about entity keys and entities.
- *
- * <p>Commits are non-transactional. If a commit fails because of a conflict over an entity
- * group, the commit will be retried (up to {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}
- * times).
+ * Initial backoff time for exponential backoff for retry attempts.
*/
- @VisibleForTesting
- static class DatastoreWriterFn extends DoFn<Entity, Void> {
- private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriterFn.class);
- private final String projectId;
- private transient Datastore datastore;
- private final V1Beta3DatastoreFactory datastoreFactory;
- // Current batch of entities to be written.
- private final List<Entity> entities = new ArrayList<>();
- /**
- * Since a bundle is written in batches, we should retry the commit of a batch in order to
- * prevent transient errors from causing the bundle to fail.
- */
- private static final int MAX_RETRIES = 5;
+ private static final int INITIAL_BACKOFF_MILLIS = 5000;
- /**
- * Initial backoff time for exponential backoff for retry attempts.
- */
- private static final int INITIAL_BACKOFF_MILLIS = 5000;
+ DatastoreWriterFn(String projectId) {
+ this(projectId, new V1Beta3DatastoreFactory());
+ }
- public DatastoreWriterFn(String projectId) {
- this(projectId, new V1Beta3DatastoreFactory());
- }
+ @VisibleForTesting
+ DatastoreWriterFn(String projectId, V1Beta3DatastoreFactory datastoreFactory) {
+ this.projectId = checkNotNull(projectId, "projectId");
+ this.datastoreFactory = datastoreFactory;
+ }
- @VisibleForTesting
- DatastoreWriterFn(String projectId, V1Beta3DatastoreFactory datastoreFactory) {
- this.projectId = checkNotNull(projectId, "projectId");
- this.datastoreFactory = datastoreFactory;
- }
+ @StartBundle
+ public void startBundle(Context c) {
+ datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId);
+ }
- @StartBundle
- public void startBundle(Context c) {
- datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId);
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ mutations.add(c.element());
+ if (mutations.size() >= V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT) {
+ flushBatch();
}
+ }
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- // Verify that the entity to write has a complete key.
- if (!isValidKey(c.element().getKey())) {
- throw new IllegalArgumentException(
- "Entities to be written to the Datastore must have complete keys");
- }
-
- entities.add(c.element());
- if (entities.size() >= V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT) {
- flushBatch();
- }
+ @FinishBundle
+ public void finishBundle(Context c) throws Exception {
+ if (mutations.size() > 0) {
+ flushBatch();
}
+ }
- @FinishBundle
- public void finishBundle(Context c) throws Exception {
- if (entities.size() > 0) {
- flushBatch();
- }
- }
+ /**
+ * Writes a batch of mutations to Cloud Datastore.
+ *
+ * <p>If a commit fails, it will be retried (up to {@link DatastoreWriterFn#MAX_RETRIES}
+ * times). All mutations in the batch will be committed again, even if the commit was partially
+ * successful. If the retry limit is exceeded, the last exception from the Datastore will be
+ * thrown.
+ *
+ * @throws DatastoreException if the commit fails or IOException or InterruptedException if
+ * backing off between retries fails.
+ */
+ private void flushBatch() throws DatastoreException, IOException, InterruptedException {
+ LOG.debug("Writing batch of {} mutations", mutations.size());
+ Sleeper sleeper = Sleeper.DEFAULT;
+ BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
- /**
- * Writes a batch of entities to Cloud Datastore.
- *
- * <p>If a commit fails, it will be retried (up to {@link DatastoreWriterFn#MAX_RETRIES}
- * times). All entities in the batch will be committed again, even if the commit was partially
- * successful. If the retry limit is exceeded, the last exception from the Datastore will be
- * thrown.
- *
- * @throws DatastoreException if the commit fails or IOException or InterruptedException if
- * backing off between retries fails.
- */
- private void flushBatch() throws DatastoreException, IOException, InterruptedException {
- LOG.debug("Writing batch of {} entities", entities.size());
- Sleeper sleeper = Sleeper.DEFAULT;
- BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
-
- while (true) {
- // Batch upsert entities.
- try {
- CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
- for (Entity entity: entities) {
- commitRequest.addMutations(makeUpsert(entity));
- }
- commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
- datastore.commit(commitRequest.build());
- // Break if the commit threw no exception.
- break;
- } catch (DatastoreException exception) {
- // Only log the code and message for potentially-transient errors. The entire exception
- // will be propagated upon the last retry.
- LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
- exception.getMessage());
- if (!BackOffUtils.next(sleeper, backoff)) {
- LOG.error("Aborting after {} retries.", MAX_RETRIES);
- throw exception;
- }
+ while (true) {
+ // Batch upsert entities.
+ try {
+ CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+ commitRequest.addAllMutations(mutations);
+ commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+ datastore.commit(commitRequest.build());
+ // Break if the commit threw no exception.
+ break;
+ } catch (DatastoreException exception) {
+ // Only log the code and message for potentially-transient errors. The entire exception
+ // will be propagated upon the last retry.
+ LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
+ exception.getMessage());
+ if (!BackOffUtils.next(sleeper, backoff)) {
+ LOG.error("Aborting after {} retries.", MAX_RETRIES);
+ throw exception;
}
}
- LOG.debug("Successfully wrote {} entities", entities.size());
- entities.clear();
}
+ LOG.debug("Successfully wrote {} mutations", mutations.size());
+ mutations.clear();
+ }
- @Override
- public void populateDisplayData(Builder builder) {
- super.populateDisplayData(builder);
- builder
- .addIfNotNull(DisplayData.item("projectId", projectId)
- .withLabel("Output Project"));
- }
+ @Override
+ public void populateDisplayData(Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .addIfNotNull(DisplayData.item("projectId", projectId)
+ .withLabel("Output Project"));
}
+ }
- /**
- * Returns true if a Datastore key is complete. A key is complete if its last element
- * has either an id or a name.
- */
- static boolean isValidKey(Key key) {
- List<PathElement> elementList = key.getPathList();
- if (elementList.isEmpty()) {
- return false;
- }
- PathElement lastElement = elementList.get(elementList.size() - 1);
- return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
+ /**
+ * Returns true if a Datastore key is complete. A key is complete if its last element
+ * has either an id or a name.
+ */
+ static boolean isValidKey(Key key) {
+ List<PathElement> elementList = key.getPathList();
+ if (elementList.isEmpty()) {
+ return false;
+ }
+ PathElement lastElement = elementList.get(elementList.size() - 1);
+ return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
+ }
+
+ /**
+ * A function that constructs an upsert {@link Mutation} from an {@link Entity}.
+ */
+ @VisibleForTesting
+ static class UpsertFn extends SimpleFunction<Entity, Mutation> {
+ @Override
+ public Mutation apply(Entity entity) {
+ // Verify that the entity to write has a complete key.
+ checkArgument(isValidKey(entity.getKey()),
+ "Entities to be written to the Datastore must have complete keys:\n%s", entity);
+
+ return makeUpsert(entity).build();
+ }
+
+ @Override
+ public void populateDisplayData(Builder builder) {
+ builder.add(DisplayData.item("upsertFn", this.getClass())
+ .withLabel("Create Upsert Mutation"));
+ }
+ }
+
+ /**
+ * A function that constructs a delete {@link Mutation} from an {@link Entity}.
+ */
+ @VisibleForTesting
+ static class DeleteEntityFn extends SimpleFunction<Entity, Mutation> {
+ @Override
+ public Mutation apply(Entity entity) {
+ // Verify that the entity to delete has a complete key.
+ checkArgument(isValidKey(entity.getKey()),
+ "Entities to be deleted from the Datastore must have complete keys:\n%s", entity);
+
+ return makeDelete(entity.getKey()).build();
+ }
+
+ @Override
+ public void populateDisplayData(Builder builder) {
+ builder.add(DisplayData.item("deleteEntityFn", this.getClass())
+ .withLabel("Create Delete Mutation"));
+ }
+ }
+
+ /**
+ * A function that constructs a delete {@link Mutation} from a {@link Key}.
+ */
+ @VisibleForTesting
+ static class DeleteKeyFn extends SimpleFunction<Key, Mutation> {
+ @Override
+ public Mutation apply(Key key) {
+ // Verify that the entity to delete has a complete key.
+ checkArgument(isValidKey(key),
+ "Keys to be deleted from the Datastore must be complete:\n%s", key);
+
+ return makeDelete(key).build();
+ }
+
+ @Override
+ public void populateDisplayData(Builder builder) {
+ builder.add(DisplayData.item("deleteKeyFn", this.getClass())
+ .withLabel("Create Delete Mutation"));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/95330658/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
index 8fa34da..b0c6c18 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
@@ -22,9 +22,11 @@ import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.DEFAULT_BUNDLE_S
import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.QUERY_BATCH_LIMIT;
import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.getEstimatedSizeBytes;
import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.makeRequest;
+import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.isValidKey;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL;
import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeDelete;
import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder;
@@ -45,12 +47,17 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DatastoreWriterFn;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteEntity;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteEntityFn;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteKey;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteKeyFn;
import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.ReadFn;
import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.SplitQueryFn;
import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.V1Beta3Options;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.UpsertFn;
import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.V1Beta3DatastoreFactory;
import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Write;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Write.DatastoreWriterFn;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.transforms.DoFnTester;
@@ -67,6 +74,7 @@ import com.google.datastore.v1beta3.CommitRequest;
import com.google.datastore.v1beta3.Entity;
import com.google.datastore.v1beta3.EntityResult;
import com.google.datastore.v1beta3.Key;
+import com.google.datastore.v1beta3.Mutation;
import com.google.datastore.v1beta3.PartitionId;
import com.google.datastore.v1beta3.Query;
import com.google.datastore.v1beta3.QueryResultBatch;
@@ -233,7 +241,7 @@ public class V1Beta3Test {
@Test
public void testWriteValidationFailsWithNoProject() throws Exception {
- V1Beta3.Write write = DatastoreIO.v1beta3().write();
+ Write write = DatastoreIO.v1beta3().write();
thrown.expect(NullPointerException.class);
thrown.expectMessage("projectId");
@@ -242,15 +250,14 @@ public class V1Beta3Test {
}
@Test
- public void testSinkValidationSucceedsWithProject() throws Exception {
- V1Beta3.Write write = DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
+ public void testWriteValidationSucceedsWithProject() throws Exception {
+ Write write = DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
write.validate(null);
}
@Test
public void testWriteDisplayData() {
- V1Beta3.Write write = DatastoreIO.v1beta3().write()
- .withProjectId(PROJECT_ID);
+ Write write = DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
DisplayData displayData = DisplayData.from(write);
@@ -258,8 +265,74 @@ public class V1Beta3Test {
}
@Test
+ public void testDeleteEntityDoesNotAllowNullProject() throws Exception {
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("projectId");
+
+ DatastoreIO.v1beta3().deleteEntity().withProjectId(null);
+ }
+
+ @Test
+ public void testDeleteEntityValidationFailsWithNoProject() throws Exception {
+ DeleteEntity deleteEntity = DatastoreIO.v1beta3().deleteEntity();
+
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("projectId");
+
+ deleteEntity.validate(null);
+ }
+
+ @Test
+ public void testDeleteEntityValidationSucceedsWithProject() throws Exception {
+ DeleteEntity deleteEntity = DatastoreIO.v1beta3().deleteEntity().withProjectId(PROJECT_ID);
+ deleteEntity.validate(null);
+ }
+
+ @Test
+ public void testDeleteEntityDisplayData() {
+ DeleteEntity deleteEntity = DatastoreIO.v1beta3().deleteEntity().withProjectId(PROJECT_ID);
+
+ DisplayData displayData = DisplayData.from(deleteEntity);
+
+ assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+ }
+
+ @Test
+ public void testDeleteKeyDoesNotAllowNullProject() throws Exception {
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("projectId");
+
+ DatastoreIO.v1beta3().deleteKey().withProjectId(null);
+ }
+
+ @Test
+ public void testDeleteKeyValidationFailsWithNoProject() throws Exception {
+ DeleteKey deleteKey = DatastoreIO.v1beta3().deleteKey();
+
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("projectId");
+
+ deleteKey.validate(null);
+ }
+
+ @Test
+ public void testDeleteKeyValidationSucceedsWithProject() throws Exception {
+ DeleteKey deleteKey = DatastoreIO.v1beta3().deleteKey().withProjectId(PROJECT_ID);
+ deleteKey.validate(null);
+ }
+
+ @Test
+ public void testDeleteKeyDisplayData() {
+ DeleteKey deleteKey = DatastoreIO.v1beta3().deleteKey().withProjectId(PROJECT_ID);
+
+ DisplayData displayData = DisplayData.from(deleteKey);
+
+ assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+ }
+
+ @Test
@Category(RunnableOnService.class)
- public void testSinkPrimitiveDisplayData() {
+ public void testWritePrimitiveDisplayData() {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
PTransform<PCollection<Entity>, ?> write =
DatastoreIO.v1beta3().write().withProjectId("myProject");
@@ -267,6 +340,39 @@ public class V1Beta3Test {
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
assertThat("DatastoreIO write should include the project in its primitive display data",
displayData, hasItem(hasDisplayItem("projectId")));
+ assertThat("DatastoreIO write should include the upsertFn in its primitive display data",
+ displayData, hasItem(hasDisplayItem("upsertFn")));
+
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testDeleteEntityPrimitiveDisplayData() {
+ DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+ PTransform<PCollection<Entity>, ?> write =
+ DatastoreIO.v1beta3().deleteEntity().withProjectId("myProject");
+
+ Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+ assertThat("DatastoreIO write should include the project in its primitive display data",
+ displayData, hasItem(hasDisplayItem("projectId")));
+ assertThat("DatastoreIO write should include the deleteEntityFn in its primitive display data",
+ displayData, hasItem(hasDisplayItem("deleteEntityFn")));
+
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testDeleteKeyPrimitiveDisplayData() {
+ DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+ PTransform<PCollection<Key>, ?> write =
+ DatastoreIO.v1beta3().deleteKey().withProjectId("myProject");
+
+ Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+ assertThat("DatastoreIO write should include the project in its primitive display data",
+ displayData, hasItem(hasDisplayItem("projectId")));
+ assertThat("DatastoreIO write should include the deleteKeyFn in its primitive display data",
+ displayData, hasItem(hasDisplayItem("deleteKeyFn")));
+
}
/**
@@ -286,33 +392,33 @@ public class V1Beta3Test {
Key key;
// Complete with name, no ancestor
key = makeKey("bird", "finch").build();
- assertTrue(Write.isValidKey(key));
+ assertTrue(isValidKey(key));
// Complete with id, no ancestor
key = makeKey("bird", 123).build();
- assertTrue(Write.isValidKey(key));
+ assertTrue(isValidKey(key));
// Incomplete, no ancestor
key = makeKey("bird").build();
- assertFalse(Write.isValidKey(key));
+ assertFalse(isValidKey(key));
// Complete with name and ancestor
key = makeKey("bird", "owl").build();
key = makeKey(key, "bird", "horned").build();
- assertTrue(Write.isValidKey(key));
+ assertTrue(isValidKey(key));
// Complete with id and ancestor
key = makeKey("bird", "owl").build();
key = makeKey(key, "bird", 123).build();
- assertTrue(Write.isValidKey(key));
+ assertTrue(isValidKey(key));
// Incomplete with ancestor
key = makeKey("bird", "owl").build();
key = makeKey(key, "bird").build();
- assertFalse(Write.isValidKey(key));
+ assertFalse(isValidKey(key));
key = makeKey().build();
- assertFalse(Write.isValidKey(key));
+ assertFalse(isValidKey(key));
}
/**
@@ -322,14 +428,86 @@ public class V1Beta3Test {
public void testAddEntitiesWithIncompleteKeys() throws Exception {
Key key = makeKey("bird").build();
Entity entity = Entity.newBuilder().setKey(key).build();
- DatastoreWriterFn writer = new DatastoreWriterFn(PROJECT_ID, mockDatastoreFactory);
- DoFnTester<Entity, Void> doFnTester = DoFnTester.of(writer);
- doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+ UpsertFn upsertFn = new UpsertFn();
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Entities to be written to the Datastore must have complete keys");
- doFnTester.processBundle(entity);
+ upsertFn.apply(entity);
+ }
+
+ @Test
+ /**
+ * Test that entities with valid keys are transformed to upsert mutations.
+ */
+ public void testAddEntities() throws Exception {
+ Key key = makeKey("bird", "finch").build();
+ Entity entity = Entity.newBuilder().setKey(key).build();
+ UpsertFn upsertFn = new UpsertFn();
+
+ Mutation exceptedMutation = makeUpsert(entity).build();
+ assertEquals(upsertFn.apply(entity), exceptedMutation);
+ }
+
+ /**
+ * Test that entities with incomplete keys cannot be deleted.
+ */
+ @Test
+ public void testDeleteEntitiesWithIncompleteKeys() throws Exception {
+ Key key = makeKey("bird").build();
+ Entity entity = Entity.newBuilder().setKey(key).build();
+ DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Entities to be deleted from the Datastore must have complete keys");
+
+ deleteEntityFn.apply(entity);
+ }
+
+ /**
+ * Test that entities with valid keys are transformed to delete mutations.
+ */
+ @Test
+ public void testDeleteEntities() throws Exception {
+ Key key = makeKey("bird", "finch").build();
+ Entity entity = Entity.newBuilder().setKey(key).build();
+ DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
+
+ Mutation exceptedMutation = makeDelete(entity.getKey()).build();
+ assertEquals(deleteEntityFn.apply(entity), exceptedMutation);
+ }
+
+ /**
+ * Test that incomplete keys cannot be deleted.
+ */
+ @Test
+ public void testDeleteIncompleteKeys() throws Exception {
+ Key key = makeKey("bird").build();
+ DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Keys to be deleted from the Datastore must be complete");
+
+ deleteKeyFn.apply(key);
+ }
+
+ /**
+ * Test that valid keys are transformed to delete mutations.
+ */
+ @Test
+ public void testDeleteKeys() throws Exception {
+ Key key = makeKey("bird", "finch").build();
+ DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
+
+ Mutation exceptedMutation = makeDelete(key).build();
+ assertEquals(deleteKeyFn.apply(key), exceptedMutation);
+ }
+
+ @Test
+ public void testDatastoreWriteFnDisplayData() {
+ DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID);
+ DisplayData displayData = DisplayData.from(datastoreWriter);
+ assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
}
/** Tests {@link DatastoreWriterFn} with entities less than one batch. */
@@ -354,27 +532,26 @@ public class V1Beta3Test {
}
// A helper method to test DatastoreWriterFn for various batch sizes.
- private void datastoreWriterFnTest(int numEntities) throws Exception {
+ private void datastoreWriterFnTest(int numMutations) throws Exception {
// Create the requested number of mutations.
- List<Entity> entities = new ArrayList<>(numEntities);
- for (int i = 0; i < numEntities; ++i) {
- entities.add(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build());
+ List<Mutation> mutations = new ArrayList<>(numMutations);
+ for (int i = 0; i < numMutations; ++i) {
+ mutations.add(
+ makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build());
}
DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, mockDatastoreFactory);
- DoFnTester<Entity, Void> doFnTester = DoFnTester.of(datastoreWriter);
+ DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
- doFnTester.processBundle(entities);
+ doFnTester.processBundle(mutations);
int start = 0;
- while (start < numEntities) {
- int end = Math.min(numEntities, start + DATASTORE_BATCH_UPDATE_LIMIT);
+ while (start < numMutations) {
+ int end = Math.min(numMutations, start + DATASTORE_BATCH_UPDATE_LIMIT);
CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
- for (Entity entity: entities.subList(start, end)) {
- commitRequest.addMutations(makeUpsert(entity));
- }
- // Verify all the batch requests were made with the expected entities.
+ commitRequest.addAllMutations(mutations.subList(start, end));
+ // Verify all the batch requests were made with the expected mutations.
verify(mockDatastore, times(1)).commit(commitRequest.build());
start = end;
}
[2/2] incubator-beam git commit: Closes #845
Posted by dh...@apache.org.
Closes #845
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/78fda451
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/78fda451
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/78fda451
Branch: refs/heads/master
Commit: 78fda4513bd14b57a9a1f8d04f784a6e61001c3e
Parents: 6645dcd 9533065
Author: Dan Halperin <dh...@google.com>
Authored: Fri Aug 19 15:42:24 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 19 15:42:24 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/datastore/V1Beta3.java | 436 +++++++++++++------
.../beam/sdk/io/gcp/datastore/V1Beta3Test.java | 237 ++++++++--
2 files changed, 508 insertions(+), 165 deletions(-)
----------------------------------------------------------------------