You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/18 00:45:35 UTC

[2/2] incubator-beam git commit: DatastoreIO Sink as ParDo

DatastoreIO Sink as ParDo


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a0361ae9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a0361ae9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a0361ae9

Branch: refs/heads/master
Commit: a0361ae99e9e39bb5ff9766508501932416129ec
Parents: a07648b
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Mon Aug 15 15:28:07 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Aug 17 17:45:05 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/datastore/V1Beta3.java      | 376 +++++++------------
 .../beam/sdk/io/gcp/datastore/V1Beta3Test.java  |  88 +++--
 2 files changed, 195 insertions(+), 269 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0361ae9/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
index 052feb3..0d2e2cb 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
@@ -30,10 +30,6 @@ import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert;
 import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
 
 import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.io.Sink.WriteOperation;
-import org.apache.beam.sdk.io.Sink.Writer;
 import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
@@ -167,7 +163,8 @@ public class V1Beta3 {
    * Datastore has a limit of 500 mutations per batch operation, so we flush
    * changes to Datastore every 500 entities.
    */
-  private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
+  @VisibleForTesting
+  static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
 
   /**
    * Returns an empty {@link V1Beta3.Read} builder. Configure the source {@code projectId},
@@ -634,42 +631,8 @@ public class V1Beta3 {
         }
       }
     }
-
-    /**
-     * A wrapper factory class for Datastore singleton classes {@link DatastoreFactory} and
-     * {@link QuerySplitter}
-     *
-     * <p>{@link DatastoreFactory} and {@link QuerySplitter} are not java serializable, hence
-     * wrapping them under this class, which implements {@link Serializable}.
-     */
-    @VisibleForTesting
-    static class V1Beta3DatastoreFactory implements Serializable {
-
-      /** Builds a Datastore client for the given pipeline options and project. */
-      public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) {
-        DatastoreOptions.Builder builder =
-            new DatastoreOptions.Builder()
-                .projectId(projectId)
-                .initializer(
-                    new RetryHttpRequestInitializer()
-                );
-
-        Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
-        if (credential != null) {
-          builder.credential(credential);
-        }
-
-        return DatastoreFactory.get().create(builder.build());
-      }
-
-      /** Builds a Datastore {@link QuerySplitter}. */
-      public QuerySplitter getQuerySplitter() {
-        return DatastoreHelper.getQuerySplitter();
-      }
-    }
   }
 
-
   /**
    * Returns an empty {@link V1Beta3.Write} builder. Configure the destination
    * {@code projectId} using {@link V1Beta3.Write#withProjectId}.
@@ -705,8 +668,8 @@ public class V1Beta3 {
 
     @Override
     public PDone apply(PCollection<Entity> input) {
-      return input.apply(
-          org.apache.beam.sdk.io.Write.to(new DatastoreSink(projectId)));
+      input.apply(ParDo.of(new DatastoreWriterFn(projectId)));
+      return PDone.in(input.getPipeline());
     }
 
     @Override
@@ -733,130 +696,127 @@ public class V1Beta3 {
           .addIfNotNull(DisplayData.item("projectId", projectId)
               .withLabel("Output Project"));
     }
-  }
 
-  /**
-   * A {@link org.apache.beam.sdk.io.Sink} that writes data to Datastore.
-   */
-  static class DatastoreSink extends org.apache.beam.sdk.io.Sink<Entity> {
-    final String projectId;
-
-    public DatastoreSink(String projectId) {
-      this.projectId = projectId;
-    }
-
-    @Override
-    public void validate(PipelineOptions options) {
-      checkNotNull(projectId, "projectId");
-    }
-
-    @Override
-    public DatastoreWriteOperation createWriteOperation(PipelineOptions options) {
-      return new DatastoreWriteOperation(this);
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      builder
-          .addIfNotNull(DisplayData.item("projectId", projectId)
-              .withLabel("Output Project"));
-    }
-  }
+    /**
+     * A {@link DoFn} that writes {@link Entity} objects to Cloud Datastore. Entities are written in
+     * batches, where the maximum batch size is {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}.
+     * Entities are committed as upsert mutations (either update if the key already exists, or
+     * insert if it is a new key). If an entity does not have a complete key (i.e., it has no name
+     * or id), the bundle will fail.
+     *
+     * <p>See <a
+     * href="https://cloud.google.com/datastore/docs/concepts/entities">
+     * Datastore: Entities, Properties, and Keys</a> for information about entity keys and entities.
+     *
+     * <p>Commits are non-transactional.  If a commit fails because of a conflict over an entity
+     * group, the commit will be retried (up to {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}
+     * times).
+     */
+    @VisibleForTesting
+    static class DatastoreWriterFn extends DoFn<Entity, Void> {
+      private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriterFn.class);
+      private final String projectId;
+      private transient Datastore datastore;
+      private final V1Beta3DatastoreFactory datastoreFactory;
+      // Current batch of entities to be written.
+      private final List<Entity> entities = new ArrayList<>();
+      /**
+       * Since a bundle is written in batches, we should retry the commit of a batch in order to
+       * prevent transient errors from causing the bundle to fail.
+       */
+      private static final int MAX_RETRIES = 5;
 
-  /**
-   * A {@link WriteOperation} that will manage a parallel write to a Datastore sink.
-   */
-  private static class DatastoreWriteOperation
-      extends WriteOperation<Entity, DatastoreWriteResult> {
-    private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriteOperation.class);
+      /**
+       * Initial backoff time for exponential backoff for retry attempts.
+       */
+      private static final int INITIAL_BACKOFF_MILLIS = 5000;
 
-    private final DatastoreSink sink;
+      public DatastoreWriterFn(String projectId) {
+        this(projectId, new V1Beta3DatastoreFactory());
+      }
 
-    public DatastoreWriteOperation(DatastoreSink sink) {
-      this.sink = sink;
-    }
+      @VisibleForTesting
+      DatastoreWriterFn(String projectId, V1Beta3DatastoreFactory datastoreFactory) {
+        this.projectId = checkNotNull(projectId, "projectId");
+        this.datastoreFactory = datastoreFactory;
+      }
 
-    @Override
-    public Coder<DatastoreWriteResult> getWriterResultCoder() {
-      return SerializableCoder.of(DatastoreWriteResult.class);
-    }
+      @StartBundle
+      public void startBundle(Context c) {
+        datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId);
+      }
 
-    @Override
-    public void initialize(PipelineOptions options) throws Exception {}
+      @ProcessElement
+      public void processElement(ProcessContext c) throws Exception {
+        // Verify that the entity to write has a complete key.
+        if (!isValidKey(c.element().getKey())) {
+          throw new IllegalArgumentException(
+              "Entities to be written to the Datastore must have complete keys");
+        }
 
-    /**
-     * Finalizes the write.  Logs the number of entities written to the Datastore.
-     */
-    @Override
-    public void finalize(Iterable<DatastoreWriteResult> writerResults, PipelineOptions options)
-        throws Exception {
-      long totalEntities = 0;
-      for (DatastoreWriteResult result : writerResults) {
-        totalEntities += result.entitiesWritten;
+        entities.add(c.element());
+        if (entities.size() >= V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT) {
+          flushBatch();
+        }
       }
-      LOG.info("Wrote {} elements.", totalEntities);
-    }
 
-    @Override
-    public DatastoreWriter createWriter(PipelineOptions options) throws Exception {
-      DatastoreOptions.Builder builder =
-          new DatastoreOptions.Builder()
-              .projectId(sink.projectId)
-              .initializer(new RetryHttpRequestInitializer());
-      Credential credential = options.as(GcpOptions.class).getGcpCredential();
-      if (credential != null) {
-        builder.credential(credential);
+      @FinishBundle
+      public void finishBundle(Context c) throws Exception {
+        if (entities.size() > 0) {
+          flushBatch();
+        }
       }
-      Datastore datastore = DatastoreFactory.get().create(builder.build());
 
-      return new DatastoreWriter(this, datastore);
-    }
+      /**
+       * Writes a batch of entities to Cloud Datastore.
+       *
+       * <p>If a commit fails, it will be retried (up to {@link DatastoreWriterFn#MAX_RETRIES}
+       * times). All entities in the batch will be committed again, even if the commit was partially
+       * successful. If the retry limit is exceeded, the last exception from the Datastore will be
+       * thrown.
+       *
+       * @throws DatastoreException if the commit fails or IOException or InterruptedException if
+       * backing off between retries fails.
+       */
+      private void flushBatch() throws DatastoreException, IOException, InterruptedException {
+        LOG.debug("Writing batch of {} entities", entities.size());
+        Sleeper sleeper = Sleeper.DEFAULT;
+        BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
+
+        while (true) {
+          // Batch upsert entities.
+          try {
+            CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+            for (Entity entity: entities) {
+              commitRequest.addMutations(makeUpsert(entity));
+            }
+            commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+            datastore.commit(commitRequest.build());
+            // Break if the commit threw no exception.
+            break;
+          } catch (DatastoreException exception) {
+            // Only log the code and message for potentially-transient errors. The entire exception
+            // will be propagated upon the last retry.
+            LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
+                exception.getMessage());
+            if (!BackOffUtils.next(sleeper, backoff)) {
+              LOG.error("Aborting after {} retries.", MAX_RETRIES);
+              throw exception;
+            }
+          }
+        }
+        LOG.debug("Successfully wrote {} entities", entities.size());
+        entities.clear();
+      }
 
-    @Override
-    public DatastoreSink getSink() {
-      return sink;
+      @Override
+      public void populateDisplayData(Builder builder) {
+        super.populateDisplayData(builder);
+        builder
+            .addIfNotNull(DisplayData.item("projectId", projectId)
+                .withLabel("Output Project"));
+      }
     }
-  }
-
-  /**
-   * {@link Writer} that writes entities to a Datastore Sink.  Entities are written in batches,
-   * where the maximum batch size is {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}.  Entities
-   * are committed as upsert mutations (either update if the key already exists, or insert if it is
-   * a new key).  If an entity does not have a complete key (i.e., it has no name or id), the bundle
-   * will fail.
-   *
-   * <p>See <a
-   * href="https://cloud.google.com/datastore/docs/concepts/entities#Datastore_Creating_an_entity">
-   * Datastore: Entities, Properties, and Keys</a> for information about entity keys and upsert
-   * mutations.
-   *
-   * <p>Commits are non-transactional.  If a commit fails because of a conflict over an entity
-   * group, the commit will be retried (up to {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}
-   * times).
-   *
-   * <p>Visible for testing purposes.
-   */
-  @VisibleForTesting
-  static class DatastoreWriter extends Writer<Entity, DatastoreWriteResult> {
-    private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriter.class);
-    private final DatastoreWriteOperation writeOp;
-    private final Datastore datastore;
-    private long totalWritten = 0;
-
-    // Visible for testing.
-    final List<Entity> entities = new ArrayList<>();
-
-    /**
-     * Since a bundle is written in batches, we should retry the commit of a batch in order to
-     * prevent transient errors from causing the bundle to fail.
-     */
-    private static final int MAX_RETRIES = 5;
-
-    /**
-     * Initial backoff time for exponential backoff for retry attempts.
-     */
-    private static final int INITIAL_BACKOFF_MILLIS = 5000;
 
     /**
      * Returns true if a Datastore key is complete.  A key is complete if its last element
@@ -870,100 +830,38 @@ public class V1Beta3 {
       PathElement lastElement = elementList.get(elementList.size() - 1);
       return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
     }
+  }
 
-    DatastoreWriter(DatastoreWriteOperation writeOp, Datastore datastore) {
-      this.writeOp = writeOp;
-      this.datastore = datastore;
-    }
-
-    @Override
-    public void open(String uId) throws Exception {}
-
-    /**
-     * Writes an entity to the Datastore.  Writes are batched, up to {@link
-     * V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}. If an entity does not have a complete key, an
-     * {@link IllegalArgumentException} will be thrown.
-     */
-    @Override
-    public void write(Entity value) throws Exception {
-      // Verify that the entity to write has a complete key.
-      if (!isValidKey(value.getKey())) {
-        throw new IllegalArgumentException(
-            "Entities to be written to the Datastore must have complete keys");
-      }
-
-      entities.add(value);
+  /**
+   * A wrapper factory class for Datastore singleton classes {@link DatastoreFactory} and
+   * {@link QuerySplitter}
+   *
+   * <p>{@link DatastoreFactory} and {@link QuerySplitter} are not java serializable, hence
+   * wrapping them under this class, which implements {@link Serializable}.
+   */
+  @VisibleForTesting
+  static class V1Beta3DatastoreFactory implements Serializable {
 
-      if (entities.size() >= V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT) {
-        flushBatch();
-      }
-    }
+    /** Builds a Datastore client for the given pipeline options and project. */
+    public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) {
+      DatastoreOptions.Builder builder =
+          new DatastoreOptions.Builder()
+              .projectId(projectId)
+              .initializer(
+                  new RetryHttpRequestInitializer()
+              );
 
-    /**
-     * Flushes any pending batch writes and returns a DatastoreWriteResult.
-     */
-    @Override
-    public DatastoreWriteResult close() throws Exception {
-      if (entities.size() > 0) {
-        flushBatch();
+      Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
+      if (credential != null) {
+        builder.credential(credential);
       }
-      return new DatastoreWriteResult(totalWritten);
-    }
 
-    @Override
-    public DatastoreWriteOperation getWriteOperation() {
-      return writeOp;
+      return DatastoreFactory.get().create(builder.build());
     }
 
-    /**
-     * Writes a batch of entities to the Datastore.
-     *
-     * <p>If a commit fails, it will be retried (up to {@link DatastoreWriter#MAX_RETRIES}
-     * times).  All entities in the batch will be committed again, even if the commit was partially
-     * successful. If the retry limit is exceeded, the last exception from the Datastore will be
-     * thrown.
-     *
-     * @throws DatastoreException if the commit fails or IOException or InterruptedException if
-     * backing off between retries fails.
-     */
-    private void flushBatch() throws DatastoreException, IOException, InterruptedException {
-      LOG.debug("Writing batch of {} entities", entities.size());
-      Sleeper sleeper = Sleeper.DEFAULT;
-      BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
-
-      while (true) {
-        // Batch upsert entities.
-        try {
-          CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
-          for (Entity entity: entities) {
-            commitRequest.addMutations(makeUpsert(entity));
-          }
-          commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
-          datastore.commit(commitRequest.build());
-          // Break if the commit threw no exception.
-          break;
-        } catch (DatastoreException exception) {
-          // Only log the code and message for potentially-transient errors. The entire exception
-          // will be propagated upon the last retry.
-          LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
-              exception.getMessage());
-          if (!BackOffUtils.next(sleeper, backoff)) {
-            LOG.error("Aborting after {} retries.", MAX_RETRIES);
-            throw exception;
-          }
-        }
-      }
-      totalWritten += entities.size();
-      LOG.debug("Successfully wrote {} entities", entities.size());
-      entities.clear();
-    }
-  }
-
-  private static class DatastoreWriteResult implements Serializable {
-    final long entitiesWritten;
-
-    public DatastoreWriteResult(long recordsWritten) {
-      this.entitiesWritten = recordsWritten;
+    /** Builds a Datastore {@link QuerySplitter}. */
+    public QuerySplitter getQuerySplitter() {
+      return DatastoreHelper.getQuerySplitter();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0361ae9/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
index 9947c60..8fa34da 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.gcp.datastore;
 
+import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT;
 import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.DEFAULT_BUNDLE_SIZE_BYTES;
 import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.QUERY_BATCH_LIMIT;
 import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.getEstimatedSizeBytes;
@@ -27,8 +28,8 @@ import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING;
 import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
 import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
 import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert;
 import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
-import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -44,11 +45,12 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DatastoreWriter;
 import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.ReadFn;
 import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.SplitQueryFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.V1Beta3DatastoreFactory;
 import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.V1Beta3Options;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.V1Beta3DatastoreFactory;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Write;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Write.DatastoreWriterFn;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.transforms.DoFnTester;
@@ -61,7 +63,7 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.POutput;
 
-import com.google.common.collect.Lists;
+import com.google.datastore.v1beta3.CommitRequest;
 import com.google.datastore.v1beta3.Entity;
 import com.google.datastore.v1beta3.EntityResult;
 import com.google.datastore.v1beta3.Key;
@@ -87,7 +89,6 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -285,33 +286,33 @@ public class V1Beta3Test {
     Key key;
     // Complete with name, no ancestor
     key = makeKey("bird", "finch").build();
-    assertTrue(DatastoreWriter.isValidKey(key));
+    assertTrue(Write.isValidKey(key));
 
     // Complete with id, no ancestor
     key = makeKey("bird", 123).build();
-    assertTrue(DatastoreWriter.isValidKey(key));
+    assertTrue(Write.isValidKey(key));
 
     // Incomplete, no ancestor
     key = makeKey("bird").build();
-    assertFalse(DatastoreWriter.isValidKey(key));
+    assertFalse(Write.isValidKey(key));
 
     // Complete with name and ancestor
     key = makeKey("bird", "owl").build();
     key = makeKey(key, "bird", "horned").build();
-    assertTrue(DatastoreWriter.isValidKey(key));
+    assertTrue(Write.isValidKey(key));
 
     // Complete with id and ancestor
     key = makeKey("bird", "owl").build();
     key = makeKey(key, "bird", 123).build();
-    assertTrue(DatastoreWriter.isValidKey(key));
+    assertTrue(Write.isValidKey(key));
 
     // Incomplete with ancestor
     key = makeKey("bird", "owl").build();
     key = makeKey(key, "bird").build();
-    assertFalse(DatastoreWriter.isValidKey(key));
+    assertFalse(Write.isValidKey(key));
 
     key = makeKey().build();
-    assertFalse(DatastoreWriter.isValidKey(key));
+    assertFalse(Write.isValidKey(key));
   }
 
   /**
@@ -321,35 +322,62 @@ public class V1Beta3Test {
   public void testAddEntitiesWithIncompleteKeys() throws Exception {
     Key key = makeKey("bird").build();
     Entity entity = Entity.newBuilder().setKey(key).build();
-    DatastoreWriter writer = new DatastoreWriter(null, mockDatastore);
+    DatastoreWriterFn writer = new DatastoreWriterFn(PROJECT_ID, mockDatastoreFactory);
+    DoFnTester<Entity, Void> doFnTester = DoFnTester.of(writer);
+    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("Entities to be written to the Datastore must have complete keys");
 
-    writer.write(entity);
+    doFnTester.processBundle(entity);
+  }
+
+  /** Tests {@link DatastoreWriterFn} with entities less than one batch. */
+  @Test
+  public void testDatatoreWriterFnWithOneBatch() throws Exception {
+    datastoreWriterFnTest(100);
+  }
+
+  /** Tests {@link DatastoreWriterFn} with entities of more than one batches, but not a multiple. */
+  @Test
+  public void testDatatoreWriterFnWithMultipleBatches() throws Exception {
+    datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 3 + 100);
   }
 
   /**
-   * Test that entities are added to the batch to update.
+   * Tests {@link DatastoreWriterFn} with entities of several batches, using an exact multiple of
+   * write batch size.
    */
   @Test
-  public void testAddingEntities() throws Exception {
-    List<Entity> expected = Lists.newArrayList(
-        Entity.newBuilder().setKey(makeKey("bird", "jay").build()).build(),
-        Entity.newBuilder().setKey(makeKey("bird", "condor").build()).build(),
-        Entity.newBuilder().setKey(makeKey("bird", "robin").build()).build());
-
-    List<Entity> allEntities = Lists.newArrayList(expected);
-    Collections.shuffle(allEntities);
-
-    DatastoreWriter writer = new DatastoreWriter(null, mockDatastore);
-    writer.open("test_id");
-    for (Entity entity : allEntities) {
-      writer.write(entity);
+  public void testDatatoreWriterFnWithBatchesExactMultiple() throws Exception {
+    datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 2);
+  }
+
+  // A helper method to test DatastoreWriterFn for various batch sizes.
+  private void datastoreWriterFnTest(int numEntities) throws Exception {
+    // Create the requested number of mutations.
+    List<Entity> entities = new ArrayList<>(numEntities);
+    for (int i = 0; i < numEntities; ++i) {
+      entities.add(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build());
     }
 
-    assertEquals(expected.size(), writer.entities.size());
-    assertThat(writer.entities, containsInAnyOrder(expected.toArray()));
+    DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, mockDatastoreFactory);
+    DoFnTester<Entity, Void> doFnTester = DoFnTester.of(datastoreWriter);
+    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+    doFnTester.processBundle(entities);
+
+    int start = 0;
+    while (start < numEntities) {
+      int end = Math.min(numEntities, start + DATASTORE_BATCH_UPDATE_LIMIT);
+      CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+      commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+      for (Entity entity: entities.subList(start, end)) {
+        commitRequest.addMutations(makeUpsert(entity));
+      }
+      // Verify all the batch requests were made with the expected entities.
+      verify(mockDatastore, times(1)).commit(commitRequest.build());
+      start = end;
+    }
   }
 
   /**