You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/19 22:42:37 UTC

[1/2] incubator-beam git commit: Datastore Sink support for writing Mutations

Repository: incubator-beam
Updated Branches:
  refs/heads/master 6645dcd4a -> 78fda4513


Datastore Sink support for writing Mutations

This generalizes Write to Write and Delete cleanly.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/95330658
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/95330658
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/95330658

Branch: refs/heads/master
Commit: 953306584073044c41bcfdc4ea5e14870ddea5e4
Parents: 6645dcd
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Wed Aug 17 18:19:52 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 19 15:41:53 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/datastore/V1Beta3.java      | 436 +++++++++++++------
 .../beam/sdk/io/gcp/datastore/V1Beta3Test.java  | 237 ++++++++--
 2 files changed, 508 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/95330658/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
index 0d2e2cb..8503b66 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
@@ -24,6 +24,7 @@ import static com.google.common.base.Verify.verify;
 import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL;
 import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING;
 import static com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NOT_FINISHED;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeDelete;
 import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
 import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder;
 import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert;
@@ -36,8 +37,10 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
@@ -60,6 +63,7 @@ import com.google.datastore.v1beta3.Entity;
 import com.google.datastore.v1beta3.EntityResult;
 import com.google.datastore.v1beta3.Key;
 import com.google.datastore.v1beta3.Key.PathElement;
+import com.google.datastore.v1beta3.Mutation;
 import com.google.datastore.v1beta3.PartitionId;
 import com.google.datastore.v1beta3.Query;
 import com.google.datastore.v1beta3.QueryResultBatch;
@@ -84,7 +88,7 @@ import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
 
 /**
- * <p>{@link V1Beta3} provides an API to Read and Write {@link PCollection PCollections} of
+ * <p>{@link V1Beta3} provides an API to Read, Write and Delete {@link PCollection PCollections} of
  * <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> version v1beta3
  * {@link Entity} objects.
  *
@@ -129,7 +133,25 @@ import javax.annotation.Nullable;
  * p.run();
  * } </pre>
  *
- * <p>{@link Entity Entities} in the {@code PCollection} to be written must have complete
+ * <p>To delete a {@link PCollection} of {@link Entity Entities} from Datastore, use
+ * {@link V1Beta3#deleteEntity()}, specifying the Cloud Datastore project to write to:
+ *
+ * <pre> {@code
+ * PCollection<Entity> entities = ...;
+ * entities.apply(DatastoreIO.v1beta3().deleteEntity().withProjectId(projectId));
+ * p.run();
+ * } </pre>
+ *
+ * <p>To delete entities associated with a {@link PCollection} of {@link Key Keys} from Datastore,
+ * use {@link V1Beta3#deleteKey}, specifying the Cloud Datastore project to write to:
+ *
+ * <pre> {@code
+ * PCollection<Entity> entities = ...;
+ * entities.apply(DatastoreIO.v1beta3().deleteKey().withProjectId(projectId));
+ * p.run();
+ * } </pre>
+ *
+ * <p>{@link Entity Entities} in the {@code PCollection} to be written or deleted must have complete
  * {@link Key Keys}. Complete {@code Keys} specify the {@code name} and {@code id} of the
  * {@code Entity}, where incomplete {@code Keys} do not. A {@code namespace} other than
  * {@code projectId} default may be used by specifying it in the {@code Entity} {@code Keys}.
@@ -139,9 +161,9 @@ import javax.annotation.Nullable;
  * keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
  * }</pre>
  *
- * <p>{@code Entities} will be committed as upsert (update or insert) mutations. Please read
- * <a href="https://cloud.google.com/datastore/docs/concepts/entities">Entities, Properties, and
- * Keys</a> for more information about {@code Entity} keys.
+ * <p>{@code Entities} will be committed as upsert (update or insert) or delete mutations. Please
+ * read <a href="https://cloud.google.com/datastore/docs/concepts/entities">Entities, Properties,
+ * and Keys</a> for more information about {@code Entity} keys.
  *
  * <p><h3>Permissions</h3>
  * Permission requirements depend on the {@code PipelineRunner} that is used to execute the
@@ -642,20 +664,33 @@ public class V1Beta3 {
   }
 
   /**
+   * Returns an empty {@link DeleteEntity} builder. Configure the destination
+   * {@code projectId} using {@link DeleteEntity#withProjectId}.
+   */
+  public DeleteEntity deleteEntity() {
+    return new DeleteEntity(null);
+  }
+
+  /**
+   * Returns an empty {@link DeleteKey} builder. Configure the destination
+   * {@code projectId} using {@link DeleteKey#withProjectId}.
+   */
+  public DeleteKey deleteKey() {
+    return new DeleteKey(null);
+  }
+
+  /**
    * A {@link PTransform} that writes {@link Entity} objects to Cloud Datastore.
    *
    * @see DatastoreIO
    */
-  public static class Write extends PTransform<PCollection<Entity>, PDone> {
-    @Nullable
-    private final String projectId;
-
+  public static class Write extends Mutate<Entity> {
     /**
      * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
      * it is {@code null} at instantiation time, an error will be thrown.
      */
-    public Write(@Nullable String projectId) {
-      this.projectId = projectId;
+    Write(@Nullable String projectId) {
+      super(projectId, new UpsertFn());
     }
 
     /**
@@ -665,27 +700,99 @@ public class V1Beta3 {
       checkNotNull(projectId, "projectId");
       return new Write(projectId);
     }
+  }
 
-    @Override
-    public PDone apply(PCollection<Entity> input) {
-      input.apply(ParDo.of(new DatastoreWriterFn(projectId)));
-      return PDone.in(input.getPipeline());
+  /**
+   * A {@link PTransform} that deletes {@link Entity Entities} from Cloud Datastore.
+   *
+   * @see DatastoreIO
+   */
+  public static class DeleteEntity extends Mutate<Entity> {
+    /**
+     * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
+     * it is {@code null} at instantiation time, an error will be thrown.
+     */
+    DeleteEntity(@Nullable String projectId) {
+      super(projectId, new DeleteEntityFn());
     }
 
-    @Override
-    public void validate(PCollection<Entity> input) {
+    /**
+     * Returns a new {@link DeleteEntity} that deletes entities from the Cloud Datastore for the
+     * specified project.
+     */
+    public DeleteEntity withProjectId(String projectId) {
+      checkNotNull(projectId, "projectId");
+      return new DeleteEntity(projectId);
+    }
+  }
+
+  /**
+   * A {@link PTransform} that deletes {@link Entity Entities} associated with the given
+   * {@link Key Keys} from Cloud Datastore.
+   *
+   * @see DatastoreIO
+   */
+  public static class DeleteKey extends Mutate<Key> {
+    /**
+     * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
+     * it is {@code null} at instantiation time, an error will be thrown.
+     */
+    DeleteKey(@Nullable String projectId) {
+      super(projectId, new DeleteKeyFn());
+    }
+
+    /**
+     * Returns a new {@link DeleteKey} that deletes entities from the Cloud Datastore for the
+     * specified project.
+     */
+    public DeleteKey withProjectId(String projectId) {
       checkNotNull(projectId, "projectId");
+      return new DeleteKey(projectId);
     }
+  }
 
+  /**
+   * A {@link PTransform} that writes mutations to Cloud Datastore.
+   *
+   * <p>It requires a {@link DoFn} that tranforms an object of type {@code T} to a {@link Mutation}.
+   * {@code T} is usually either an {@link Entity} or a {@link Key}
+   * <b>Note:</b> Only idempotent Cloud Datastore mutation operations (upsert and delete) should
+   * be used by the {@code DoFn} provided, as the commits are retried when failures occur.
+   */
+  private abstract static class Mutate<T> extends PTransform<PCollection<T>, PDone> {
     @Nullable
-    public String getProjectId() {
-      return projectId;
+    private final String projectId;
+    /** A function that transforms each {@code T} into a mutation. */
+    private final SimpleFunction<T, Mutation> mutationFn;
+
+    /**
+     * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
+     * it is {@code null} at instantiation time, an error will be thrown.
+     */
+    Mutate(@Nullable String projectId, SimpleFunction<T, Mutation> mutationFn) {
+      this.projectId = projectId;
+      this.mutationFn = checkNotNull(mutationFn);
+    }
+
+    @Override
+    public PDone apply(PCollection<T> input) {
+      input.apply("Convert to Mutation", MapElements.via(mutationFn))
+          .apply("Write Mutation to Datastore", ParDo.of(new DatastoreWriterFn(projectId)));
+
+      return PDone.in(input.getPipeline());
+    }
+
+    @Override
+    public void validate(PCollection<T> input) {
+      checkNotNull(projectId, "projectId");
+      checkNotNull(mutationFn, "mutationFn");
     }
 
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(getClass())
           .add("projectId", projectId)
+          .add("mutationFn", mutationFn.getClass().getName())
           .toString();
     }
 
@@ -694,141 +801,200 @@ public class V1Beta3 {
       super.populateDisplayData(builder);
       builder
           .addIfNotNull(DisplayData.item("projectId", projectId)
-              .withLabel("Output Project"));
+              .withLabel("Output Project"))
+          .include(mutationFn);
     }
 
+    public String getProjectId() {
+      return projectId;
+    }
+  }
+
+  /**
+   * {@link DoFn} that writes {@link Mutation}s to Cloud Datastore. Mutations are written in
+   * batches, where the maximum batch size is {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}.
+   *
+   * <p>See <a
+   * href="https://cloud.google.com/datastore/docs/concepts/entities">
+   * Datastore: Entities, Properties, and Keys</a> for information about entity keys and mutations.
+   *
+   * <p>Commits are non-transactional.  If a commit fails because of a conflict over an entity
+   * group, the commit will be retried (up to {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}
+   * times). This means that the mutation operation should be idempotent. Thus, the writer should
+   * only be used for {code upsert} and {@code delete} mutation operations, as these are the only
+   * two Cloud Datastore mutations that are idempotent.
+   */
+  @VisibleForTesting
+  static class DatastoreWriterFn extends DoFn<Mutation, Void> {
+    private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriterFn.class);
+    private final String projectId;
+    private transient Datastore datastore;
+    private final V1Beta3DatastoreFactory datastoreFactory;
+    // Current batch of mutations to be written.
+    private final List<Mutation> mutations = new ArrayList<>();
+    /**
+     * Since a bundle is written in batches, we should retry the commit of a batch in order to
+     * prevent transient errors from causing the bundle to fail.
+     */
+    private static final int MAX_RETRIES = 5;
+
     /**
-     * A {@link DoFn} that writes {@link Entity} objects to Cloud Datastore. Entities are written in
-     * batches, where the maximum batch size is {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}.
-     * Entities are committed as upsert mutations (either update if the key already exists, or
-     * insert if it is a new key). If an entity does not have a complete key (i.e., it has no name
-     * or id), the bundle will fail.
-     *
-     * <p>See <a
-     * href="https://cloud.google.com/datastore/docs/concepts/entities">
-     * Datastore: Entities, Properties, and Keys</a> for information about entity keys and entities.
-     *
-     * <p>Commits are non-transactional.  If a commit fails because of a conflict over an entity
-     * group, the commit will be retried (up to {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}
-     * times).
+     * Initial backoff time for exponential backoff for retry attempts.
      */
-    @VisibleForTesting
-    static class DatastoreWriterFn extends DoFn<Entity, Void> {
-      private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriterFn.class);
-      private final String projectId;
-      private transient Datastore datastore;
-      private final V1Beta3DatastoreFactory datastoreFactory;
-      // Current batch of entities to be written.
-      private final List<Entity> entities = new ArrayList<>();
-      /**
-       * Since a bundle is written in batches, we should retry the commit of a batch in order to
-       * prevent transient errors from causing the bundle to fail.
-       */
-      private static final int MAX_RETRIES = 5;
+    private static final int INITIAL_BACKOFF_MILLIS = 5000;
 
-      /**
-       * Initial backoff time for exponential backoff for retry attempts.
-       */
-      private static final int INITIAL_BACKOFF_MILLIS = 5000;
+    DatastoreWriterFn(String projectId) {
+      this(projectId, new V1Beta3DatastoreFactory());
+    }
 
-      public DatastoreWriterFn(String projectId) {
-        this(projectId, new V1Beta3DatastoreFactory());
-      }
+    @VisibleForTesting
+    DatastoreWriterFn(String projectId, V1Beta3DatastoreFactory datastoreFactory) {
+      this.projectId = checkNotNull(projectId, "projectId");
+      this.datastoreFactory = datastoreFactory;
+    }
 
-      @VisibleForTesting
-      DatastoreWriterFn(String projectId, V1Beta3DatastoreFactory datastoreFactory) {
-        this.projectId = checkNotNull(projectId, "projectId");
-        this.datastoreFactory = datastoreFactory;
-      }
+    @StartBundle
+    public void startBundle(Context c) {
+      datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId);
+    }
 
-      @StartBundle
-      public void startBundle(Context c) {
-        datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId);
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      mutations.add(c.element());
+      if (mutations.size() >= V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT) {
+        flushBatch();
       }
+    }
 
-      @ProcessElement
-      public void processElement(ProcessContext c) throws Exception {
-        // Verify that the entity to write has a complete key.
-        if (!isValidKey(c.element().getKey())) {
-          throw new IllegalArgumentException(
-              "Entities to be written to the Datastore must have complete keys");
-        }
-
-        entities.add(c.element());
-        if (entities.size() >= V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT) {
-          flushBatch();
-        }
+    @FinishBundle
+    public void finishBundle(Context c) throws Exception {
+      if (mutations.size() > 0) {
+        flushBatch();
       }
+    }
 
-      @FinishBundle
-      public void finishBundle(Context c) throws Exception {
-        if (entities.size() > 0) {
-          flushBatch();
-        }
-      }
+    /**
+     * Writes a batch of mutations to Cloud Datastore.
+     *
+     * <p>If a commit fails, it will be retried (up to {@link DatastoreWriterFn#MAX_RETRIES}
+     * times). All mutations in the batch will be committed again, even if the commit was partially
+     * successful. If the retry limit is exceeded, the last exception from the Datastore will be
+     * thrown.
+     *
+     * @throws DatastoreException if the commit fails or IOException or InterruptedException if
+     * backing off between retries fails.
+     */
+    private void flushBatch() throws DatastoreException, IOException, InterruptedException {
+      LOG.debug("Writing batch of {} mutations", mutations.size());
+      Sleeper sleeper = Sleeper.DEFAULT;
+      BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
 
-      /**
-       * Writes a batch of entities to Cloud Datastore.
-       *
-       * <p>If a commit fails, it will be retried (up to {@link DatastoreWriterFn#MAX_RETRIES}
-       * times). All entities in the batch will be committed again, even if the commit was partially
-       * successful. If the retry limit is exceeded, the last exception from the Datastore will be
-       * thrown.
-       *
-       * @throws DatastoreException if the commit fails or IOException or InterruptedException if
-       * backing off between retries fails.
-       */
-      private void flushBatch() throws DatastoreException, IOException, InterruptedException {
-        LOG.debug("Writing batch of {} entities", entities.size());
-        Sleeper sleeper = Sleeper.DEFAULT;
-        BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
-
-        while (true) {
-          // Batch upsert entities.
-          try {
-            CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
-            for (Entity entity: entities) {
-              commitRequest.addMutations(makeUpsert(entity));
-            }
-            commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
-            datastore.commit(commitRequest.build());
-            // Break if the commit threw no exception.
-            break;
-          } catch (DatastoreException exception) {
-            // Only log the code and message for potentially-transient errors. The entire exception
-            // will be propagated upon the last retry.
-            LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
-                exception.getMessage());
-            if (!BackOffUtils.next(sleeper, backoff)) {
-              LOG.error("Aborting after {} retries.", MAX_RETRIES);
-              throw exception;
-            }
+      while (true) {
+        // Batch upsert entities.
+        try {
+          CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+          commitRequest.addAllMutations(mutations);
+          commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+          datastore.commit(commitRequest.build());
+          // Break if the commit threw no exception.
+          break;
+        } catch (DatastoreException exception) {
+          // Only log the code and message for potentially-transient errors. The entire exception
+          // will be propagated upon the last retry.
+          LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
+              exception.getMessage());
+          if (!BackOffUtils.next(sleeper, backoff)) {
+            LOG.error("Aborting after {} retries.", MAX_RETRIES);
+            throw exception;
           }
         }
-        LOG.debug("Successfully wrote {} entities", entities.size());
-        entities.clear();
       }
+      LOG.debug("Successfully wrote {} mutations", mutations.size());
+      mutations.clear();
+    }
 
-      @Override
-      public void populateDisplayData(Builder builder) {
-        super.populateDisplayData(builder);
-        builder
-            .addIfNotNull(DisplayData.item("projectId", projectId)
-                .withLabel("Output Project"));
-      }
+    @Override
+    public void populateDisplayData(Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(DisplayData.item("projectId", projectId)
+              .withLabel("Output Project"));
     }
+  }
 
-    /**
-     * Returns true if a Datastore key is complete.  A key is complete if its last element
-     * has either an id or a name.
-     */
-    static boolean isValidKey(Key key) {
-      List<PathElement> elementList = key.getPathList();
-      if (elementList.isEmpty()) {
-        return false;
-      }
-      PathElement lastElement = elementList.get(elementList.size() - 1);
-      return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
+  /**
+   * Returns true if a Datastore key is complete. A key is complete if its last element
+   * has either an id or a name.
+   */
+  static boolean isValidKey(Key key) {
+    List<PathElement> elementList = key.getPathList();
+    if (elementList.isEmpty()) {
+      return false;
+    }
+    PathElement lastElement = elementList.get(elementList.size() - 1);
+    return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
+  }
+
+  /**
+   * A function that constructs an upsert {@link Mutation} from an {@link Entity}.
+   */
+  @VisibleForTesting
+  static class UpsertFn extends SimpleFunction<Entity, Mutation> {
+    @Override
+    public Mutation apply(Entity entity) {
+      // Verify that the entity to write has a complete key.
+      checkArgument(isValidKey(entity.getKey()),
+          "Entities to be written to the Datastore must have complete keys:\n%s", entity);
+
+      return makeUpsert(entity).build();
+    }
+
+    @Override
+    public void populateDisplayData(Builder builder) {
+      builder.add(DisplayData.item("upsertFn", this.getClass())
+          .withLabel("Create Upsert Mutation"));
+    }
+  }
+
+  /**
+   * A function that constructs a delete {@link Mutation} from an {@link Entity}.
+   */
+  @VisibleForTesting
+  static class DeleteEntityFn extends SimpleFunction<Entity, Mutation> {
+    @Override
+    public Mutation apply(Entity entity) {
+      // Verify that the entity to delete has a complete key.
+      checkArgument(isValidKey(entity.getKey()),
+          "Entities to be deleted from the Datastore must have complete keys:\n%s", entity);
+
+      return makeDelete(entity.getKey()).build();
+    }
+
+    @Override
+    public void populateDisplayData(Builder builder) {
+      builder.add(DisplayData.item("deleteEntityFn", this.getClass())
+          .withLabel("Create Delete Mutation"));
+    }
+  }
+
+  /**
+   * A function that constructs a delete {@link Mutation} from a {@link Key}.
+   */
+  @VisibleForTesting
+  static class DeleteKeyFn extends SimpleFunction<Key, Mutation> {
+    @Override
+    public Mutation apply(Key key) {
+      // Verify that the entity to delete has a complete key.
+      checkArgument(isValidKey(key),
+          "Keys to be deleted from the Datastore must be complete:\n%s", key);
+
+      return makeDelete(key).build();
+    }
+
+    @Override
+    public void populateDisplayData(Builder builder) {
+      builder.add(DisplayData.item("deleteKeyFn", this.getClass())
+          .withLabel("Create Delete Mutation"));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/95330658/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
index 8fa34da..b0c6c18 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
@@ -22,9 +22,11 @@ import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.DEFAULT_BUNDLE_S
 import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.QUERY_BATCH_LIMIT;
 import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.getEstimatedSizeBytes;
 import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.makeRequest;
+import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.isValidKey;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL;
 import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeDelete;
 import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
 import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
 import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder;
@@ -45,12 +47,17 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DatastoreWriterFn;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteEntity;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteEntityFn;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteKey;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteKeyFn;
 import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.ReadFn;
 import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.SplitQueryFn;
 import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.V1Beta3Options;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.UpsertFn;
 import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.V1Beta3DatastoreFactory;
 import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Write;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Write.DatastoreWriterFn;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.transforms.DoFnTester;
@@ -67,6 +74,7 @@ import com.google.datastore.v1beta3.CommitRequest;
 import com.google.datastore.v1beta3.Entity;
 import com.google.datastore.v1beta3.EntityResult;
 import com.google.datastore.v1beta3.Key;
+import com.google.datastore.v1beta3.Mutation;
 import com.google.datastore.v1beta3.PartitionId;
 import com.google.datastore.v1beta3.Query;
 import com.google.datastore.v1beta3.QueryResultBatch;
@@ -233,7 +241,7 @@ public class V1Beta3Test {
 
   @Test
   public void testWriteValidationFailsWithNoProject() throws Exception {
-    V1Beta3.Write write =  DatastoreIO.v1beta3().write();
+    Write write =  DatastoreIO.v1beta3().write();
 
     thrown.expect(NullPointerException.class);
     thrown.expectMessage("projectId");
@@ -242,15 +250,14 @@ public class V1Beta3Test {
   }
 
   @Test
-  public void testSinkValidationSucceedsWithProject() throws Exception {
-    V1Beta3.Write write =  DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
+  public void testWriteValidationSucceedsWithProject() throws Exception {
+    Write write =  DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
     write.validate(null);
   }
 
   @Test
   public void testWriteDisplayData() {
-    V1Beta3.Write write =  DatastoreIO.v1beta3().write()
-        .withProjectId(PROJECT_ID);
+    Write write =  DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
 
     DisplayData displayData = DisplayData.from(write);
 
@@ -258,8 +265,74 @@ public class V1Beta3Test {
   }
 
   @Test
+  public void testDeleteEntityDoesNotAllowNullProject() throws Exception {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("projectId");
+
+    DatastoreIO.v1beta3().deleteEntity().withProjectId(null);
+  }
+
+  @Test
+  public void testDeleteEntityValidationFailsWithNoProject() throws Exception {
+    DeleteEntity deleteEntity = DatastoreIO.v1beta3().deleteEntity();
+
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("projectId");
+
+    deleteEntity.validate(null);
+  }
+
+  @Test
+  public void testDeleteEntityValidationSucceedsWithProject() throws Exception {
+    DeleteEntity deleteEntity = DatastoreIO.v1beta3().deleteEntity().withProjectId(PROJECT_ID);
+    deleteEntity.validate(null);
+  }
+
+  @Test
+  public void testDeleteEntityDisplayData() {
+    DeleteEntity deleteEntity =  DatastoreIO.v1beta3().deleteEntity().withProjectId(PROJECT_ID);
+
+    DisplayData displayData = DisplayData.from(deleteEntity);
+
+    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+  }
+
+  @Test
+  public void testDeleteKeyDoesNotAllowNullProject() throws Exception {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("projectId");
+
+    DatastoreIO.v1beta3().deleteKey().withProjectId(null);
+  }
+
+  @Test
+  public void testDeleteKeyValidationFailsWithNoProject() throws Exception {
+    DeleteKey deleteKey = DatastoreIO.v1beta3().deleteKey();
+
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("projectId");
+
+    deleteKey.validate(null);
+  }
+
+  @Test
+  public void testDeleteKeyValidationSucceedsWithProject() throws Exception {
+    DeleteKey deleteKey = DatastoreIO.v1beta3().deleteKey().withProjectId(PROJECT_ID);
+    deleteKey.validate(null);
+  }
+
+  @Test
+  public void testDeleteKeyDisplayData() {
+    DeleteKey deleteKey =  DatastoreIO.v1beta3().deleteKey().withProjectId(PROJECT_ID);
+
+    DisplayData displayData = DisplayData.from(deleteKey);
+
+    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+  }
+
+  @Test
   @Category(RunnableOnService.class)
-  public void testSinkPrimitiveDisplayData() {
+  public void testWritePrimitiveDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
     PTransform<PCollection<Entity>, ?> write =
         DatastoreIO.v1beta3().write().withProjectId("myProject");
@@ -267,6 +340,39 @@ public class V1Beta3Test {
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
     assertThat("DatastoreIO write should include the project in its primitive display data",
         displayData, hasItem(hasDisplayItem("projectId")));
+    assertThat("DatastoreIO write should include the upsertFn in its primitive display data",
+        displayData, hasItem(hasDisplayItem("upsertFn")));
+
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testDeleteEntityPrimitiveDisplayData() {
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+    PTransform<PCollection<Entity>, ?> write =
+        DatastoreIO.v1beta3().deleteEntity().withProjectId("myProject");
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+    assertThat("DatastoreIO write should include the project in its primitive display data",
+        displayData, hasItem(hasDisplayItem("projectId")));
+    assertThat("DatastoreIO write should include the deleteEntityFn in its primitive display data",
+        displayData, hasItem(hasDisplayItem("deleteEntityFn")));
+
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testDeleteKeyPrimitiveDisplayData() {
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+    PTransform<PCollection<Key>, ?> write =
+        DatastoreIO.v1beta3().deleteKey().withProjectId("myProject");
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+    assertThat("DatastoreIO write should include the project in its primitive display data",
+        displayData, hasItem(hasDisplayItem("projectId")));
+    assertThat("DatastoreIO write should include the deleteKeyFn in its primitive display data",
+        displayData, hasItem(hasDisplayItem("deleteKeyFn")));
+
   }
 
   /**
@@ -286,33 +392,33 @@ public class V1Beta3Test {
     Key key;
     // Complete with name, no ancestor
     key = makeKey("bird", "finch").build();
-    assertTrue(Write.isValidKey(key));
+    assertTrue(isValidKey(key));
 
     // Complete with id, no ancestor
     key = makeKey("bird", 123).build();
-    assertTrue(Write.isValidKey(key));
+    assertTrue(isValidKey(key));
 
     // Incomplete, no ancestor
     key = makeKey("bird").build();
-    assertFalse(Write.isValidKey(key));
+    assertFalse(isValidKey(key));
 
     // Complete with name and ancestor
     key = makeKey("bird", "owl").build();
     key = makeKey(key, "bird", "horned").build();
-    assertTrue(Write.isValidKey(key));
+    assertTrue(isValidKey(key));
 
     // Complete with id and ancestor
     key = makeKey("bird", "owl").build();
     key = makeKey(key, "bird", 123).build();
-    assertTrue(Write.isValidKey(key));
+    assertTrue(isValidKey(key));
 
     // Incomplete with ancestor
     key = makeKey("bird", "owl").build();
     key = makeKey(key, "bird").build();
-    assertFalse(Write.isValidKey(key));
+    assertFalse(isValidKey(key));
 
     key = makeKey().build();
-    assertFalse(Write.isValidKey(key));
+    assertFalse(isValidKey(key));
   }
 
   /**
@@ -322,14 +428,86 @@ public class V1Beta3Test {
   public void testAddEntitiesWithIncompleteKeys() throws Exception {
     Key key = makeKey("bird").build();
     Entity entity = Entity.newBuilder().setKey(key).build();
-    DatastoreWriterFn writer = new DatastoreWriterFn(PROJECT_ID, mockDatastoreFactory);
-    DoFnTester<Entity, Void> doFnTester = DoFnTester.of(writer);
-    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+    UpsertFn upsertFn = new UpsertFn();
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("Entities to be written to the Datastore must have complete keys");
 
-    doFnTester.processBundle(entity);
+    upsertFn.apply(entity);
+  }
+
+  @Test
+  /**
+   * Test that entities with valid keys are transformed to upsert mutations.
+   */
+  public void testAddEntities() throws Exception {
+    Key key = makeKey("bird", "finch").build();
+    Entity entity = Entity.newBuilder().setKey(key).build();
+    UpsertFn upsertFn = new UpsertFn();
+
+    Mutation exceptedMutation = makeUpsert(entity).build();
+    assertEquals(upsertFn.apply(entity), exceptedMutation);
+  }
+
+  /**
+   * Test that entities with incomplete keys cannot be deleted.
+   */
+  @Test
+  public void testDeleteEntitiesWithIncompleteKeys() throws Exception {
+    Key key = makeKey("bird").build();
+    Entity entity = Entity.newBuilder().setKey(key).build();
+    DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Entities to be deleted from the Datastore must have complete keys");
+
+    deleteEntityFn.apply(entity);
+  }
+
+  /**
+   * Test that entities with valid keys are transformed to delete mutations.
+   */
+  @Test
+  public void testDeleteEntities() throws Exception {
+    Key key = makeKey("bird", "finch").build();
+    Entity entity = Entity.newBuilder().setKey(key).build();
+    DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
+
+    Mutation exceptedMutation = makeDelete(entity.getKey()).build();
+    assertEquals(deleteEntityFn.apply(entity), exceptedMutation);
+  }
+
+  /**
+   * Test that incomplete keys cannot be deleted.
+   */
+  @Test
+  public void testDeleteIncompleteKeys() throws Exception {
+    Key key = makeKey("bird").build();
+    DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Keys to be deleted from the Datastore must be complete");
+
+    deleteKeyFn.apply(key);
+  }
+
+  /**
+   * Test that valid keys are transformed to delete mutations.
+   */
+  @Test
+  public void testDeleteKeys() throws Exception {
+    Key key = makeKey("bird", "finch").build();
+    DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
+
+    Mutation exceptedMutation = makeDelete(key).build();
+    assertEquals(deleteKeyFn.apply(key), exceptedMutation);
+  }
+
+  @Test
+  public void testDatastoreWriteFnDisplayData() {
+    DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID);
+    DisplayData displayData = DisplayData.from(datastoreWriter);
+    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
   }
 
   /** Tests {@link DatastoreWriterFn} with entities less than one batch. */
@@ -354,27 +532,26 @@ public class V1Beta3Test {
   }
 
   // A helper method to test DatastoreWriterFn for various batch sizes.
-  private void datastoreWriterFnTest(int numEntities) throws Exception {
+  private void datastoreWriterFnTest(int numMutations) throws Exception {
     // Create the requested number of mutations.
-    List<Entity> entities = new ArrayList<>(numEntities);
-    for (int i = 0; i < numEntities; ++i) {
-      entities.add(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build());
+    List<Mutation> mutations = new ArrayList<>(numMutations);
+    for (int i = 0; i < numMutations; ++i) {
+      mutations.add(
+          makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build());
     }
 
     DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, mockDatastoreFactory);
-    DoFnTester<Entity, Void> doFnTester = DoFnTester.of(datastoreWriter);
+    DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
     doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
-    doFnTester.processBundle(entities);
+    doFnTester.processBundle(mutations);
 
     int start = 0;
-    while (start < numEntities) {
-      int end = Math.min(numEntities, start + DATASTORE_BATCH_UPDATE_LIMIT);
+    while (start < numMutations) {
+      int end = Math.min(numMutations, start + DATASTORE_BATCH_UPDATE_LIMIT);
       CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
       commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
-      for (Entity entity: entities.subList(start, end)) {
-        commitRequest.addMutations(makeUpsert(entity));
-      }
-      // Verify all the batch requests were made with the expected entities.
+        commitRequest.addAllMutations(mutations.subList(start, end));
+      // Verify all the batch requests were made with the expected mutations.
       verify(mockDatastore, times(1)).commit(commitRequest.build());
       start = end;
     }



[2/2] incubator-beam git commit: Closes #845

Posted by dh...@apache.org.
Closes #845


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/78fda451
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/78fda451
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/78fda451

Branch: refs/heads/master
Commit: 78fda4513bd14b57a9a1f8d04f784a6e61001c3e
Parents: 6645dcd 9533065
Author: Dan Halperin <dh...@google.com>
Authored: Fri Aug 19 15:42:24 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 19 15:42:24 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/datastore/V1Beta3.java      | 436 +++++++++++++------
 .../beam/sdk/io/gcp/datastore/V1Beta3Test.java  | 237 ++++++++--
 2 files changed, 508 insertions(+), 165 deletions(-)
----------------------------------------------------------------------