You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/19 20:22:32 UTC

[3/4] beam git commit: Comply with byte limit for Datastore Commit.

Comply with byte limit for Datastore Commit.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c4d14f8b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c4d14f8b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c4d14f8b

Branch: refs/heads/master
Commit: c4d14f8be7bb72fd653f1ab8e8080fc2b65f6672
Parents: ce00d24
Author: Colin Phipps <cp...@moria.org.uk>
Authored: Tue Apr 25 15:28:28 2017 +0000
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri May 19 13:11:17 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  | 16 +++++++++
 .../sdk/io/gcp/datastore/DatastoreV1Test.java   | 34 ++++++++++++++++++++
 2 files changed, 50 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c4d14f8b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 16bb1b4..4cfb801 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -208,6 +208,13 @@ public class DatastoreV1 {
   static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
 
   /**
+   * Cloud Datastore has a limit of 10MB per RPC, so we also flush if the total size of mutations
+   * exceeds this limit. This is set lower than the 10MB limit on the RPC, as this only accounts for
+   * the mutations themselves and not the CommitRequest wrapper around them.
+   */
+  static final int DATASTORE_BATCH_UPDATE_BYTES_LIMIT = 5_000_000;
+
+  /**
    * Returns an empty {@link DatastoreV1.Read} builder. Configure the source {@code projectId},
    * {@code query}, and optionally {@code namespace} and {@code numQuerySplits} using
    * {@link DatastoreV1.Read#withProjectId}, {@link DatastoreV1.Read#withQuery},
@@ -1123,6 +1130,7 @@ public class DatastoreV1 {
     private final V1DatastoreFactory datastoreFactory;
     // Current batch of mutations to be written.
     private final List<Mutation> mutations = new ArrayList<>();
+    private int mutationsSize = 0;  // Accumulated size of protos in mutations.
 
     private static final int MAX_RETRIES = 5;
     private static final FluentBackoff BUNDLE_WRITE_BACKOFF =
@@ -1152,7 +1160,14 @@ public class DatastoreV1 {
 
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
+      Mutation write = c.element();
+      int size = write.getSerializedSize();
+      if (mutations.size() > 0
+          && mutationsSize + size >= DatastoreV1.DATASTORE_BATCH_UPDATE_BYTES_LIMIT) {
+        flushBatch();
+      }
       mutations.add(c.element());
+      mutationsSize += size;
       if (mutations.size() >= DatastoreV1.DATASTORE_BATCH_UPDATE_LIMIT) {
         flushBatch();
       }
@@ -1203,6 +1218,7 @@ public class DatastoreV1 {
       }
       LOG.debug("Successfully wrote {} mutations", mutations.size());
       mutations.clear();
+      mutationsSize = 0;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/c4d14f8b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
index ba8ac84..3597b54 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -60,6 +60,7 @@ import com.google.datastore.v1.Query;
 import com.google.datastore.v1.QueryResultBatch;
 import com.google.datastore.v1.RunQueryRequest;
 import com.google.datastore.v1.RunQueryResponse;
+import com.google.datastore.v1.Value;
 import com.google.datastore.v1.client.Datastore;
 import com.google.datastore.v1.client.DatastoreException;
 import com.google.datastore.v1.client.QuerySplitter;
@@ -645,6 +646,39 @@ public class DatastoreV1Test {
   }
 
   /**
+   * Tests {@link DatastoreWriterFn} with large entities that need to be split into more batches.
+   */
+  @Test
+  public void testDatatoreWriterFnWithLargeEntities() throws Exception {
+    List<Mutation> mutations = new ArrayList<>();
+    for (int i = 0; i < 12; ++i) {
+      Entity.Builder entity = Entity.newBuilder().setKey(makeKey("key" + i, i + 1));
+      entity.putProperties("long", Value.newBuilder().setStringValue(new String(new char[1_000_000])
+            ).setExcludeFromIndexes(true).build());
+      mutations.add(makeUpsert(entity.build()).build());
+    }
+
+    DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID),
+        null, mockDatastoreFactory);
+    DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
+    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+    doFnTester.processBundle(mutations);
+
+    // This test is over-specific currently; it requires that we split the 12 entity writes into 3
+    // requests, but we only need each CommitRequest to be less than 10MB in size.
+    int start = 0;
+    while (start < mutations.size()) {
+      int end = Math.min(mutations.size(), start + 4);
+      CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+      commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+      commitRequest.addAllMutations(mutations.subList(start, end));
+      // Verify all the batch requests were made with the expected mutations.
+      verify(mockDatastore).commit(commitRequest.build());
+      start = end;
+    }
+  }
+
+  /**
    * Tests {@link DatastoreV1.Read#getEstimatedSizeBytes} to fetch and return estimated size for a
    * query.
    */