You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/19 20:22:30 UTC

[1/4] beam git commit: End-to-end test for large entity writes.

Repository: beam
Updated Branches:
  refs/heads/master ce00d2466 -> 9a6baefcd


End-to-end test for large entity writes.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dcf40564
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dcf40564
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dcf40564

Branch: refs/heads/master
Commit: dcf405644e2b51303e1d2c12592fe82ee01eb32f
Parents: de95c7f
Author: Colin Phipps <fi...@google.com>
Authored: Tue May 9 09:40:50 2017 +0000
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri May 19 13:11:17 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/datastore/V1ReadIT.java     |  2 +-
 .../beam/sdk/io/gcp/datastore/V1TestUtil.java   | 15 ++++++--
 .../beam/sdk/io/gcp/datastore/V1WriteIT.java    | 36 ++++++++++++++++++--
 3 files changed, 47 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/dcf40564/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
index ec7fa8f..22945f5 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
@@ -148,7 +148,7 @@ public class V1ReadIT {
     Key ancestorKey = makeAncestorKey(options.getNamespace(), options.getKind(), ancestor);
 
     for (long i = 0; i < numEntities; i++) {
-      Entity entity = makeEntity(i, ancestorKey, options.getKind(), options.getNamespace());
+      Entity entity = makeEntity(i, ancestorKey, options.getKind(), options.getNamespace(), 0);
       writer.write(entity);
     }
     writer.close();

http://git-wip-us.apache.org/repos/asf/beam/blob/dcf40564/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
index dc91638..5e618df 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
@@ -92,8 +92,10 @@ class V1TestUtil {
 
   /**
    * Build an entity for the given ancestorKey, kind, namespace and value.
+   * @param largePropertySize if greater than 0, add an unindexed property of the given size.
    */
-  static Entity makeEntity(Long value, Key ancestorKey, String kind, @Nullable String namespace) {
+  static Entity makeEntity(Long value, Key ancestorKey, String kind, @Nullable String namespace,
+      int largePropertySize) {
     Entity.Builder entityBuilder = Entity.newBuilder();
     Key.Builder keyBuilder = makeKey(ancestorKey, kind, UUID.randomUUID().toString());
     // NOTE: Namespace is not inherited between keys created with DatastoreHelper.makeKey, so
@@ -105,6 +107,10 @@ class V1TestUtil {
 
     entityBuilder.setKey(keyBuilder.build());
     entityBuilder.putProperties("value", makeValue(value).build());
+    if (largePropertySize > 0) {
+      entityBuilder.putProperties("unindexed_value", makeValue(new String(
+          new char[largePropertySize]).replace("\0", "A")).setExcludeFromIndexes(true).build());
+    }
     return entityBuilder.build();
   }
 
@@ -115,18 +121,21 @@ class V1TestUtil {
     private final String kind;
     @Nullable
     private final String namespace;
+    private final int largePropertySize;
     private Key ancestorKey;
 
-    CreateEntityFn(String kind, @Nullable String namespace, String ancestor) {
+    CreateEntityFn(String kind, @Nullable String namespace, String ancestor,
+        int largePropertySize) {
       this.kind = kind;
       this.namespace = namespace;
+      this.largePropertySize = largePropertySize;
       // Build the ancestor key for all created entities once, including the namespace.
       ancestorKey = makeAncestorKey(namespace, kind, ancestor);
     }
 
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
-      c.output(makeEntity(c.element(), ancestorKey, kind, namespace));
+      c.output(makeEntity(c.element(), ancestorKey, kind, namespace, largePropertySize));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/dcf40564/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
index 82e4d64..4a874fd 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
@@ -67,8 +67,7 @@ public class V1WriteIT {
 
     // Write to datastore
     p.apply(GenerateSequence.from(0).to(numEntities))
-        .apply(ParDo.of(new CreateEntityFn(
-            options.getKind(), options.getNamespace(), ancestor)))
+        .apply(ParDo.of(new CreateEntityFn(options.getKind(), options.getNamespace(), ancestor, 0)))
         .apply(DatastoreIO.v1().write().withProjectId(project));
 
     p.run();
@@ -79,6 +78,39 @@ public class V1WriteIT {
     assertEquals(numEntitiesWritten, numEntities);
   }
 
+  /**
+   * An end-to-end test for {@link DatastoreV1.Write}.
+   *
+   * <p>Write some large test entities to Cloud Datastore, to test that a batch is flushed when
+   * the byte size limit is reached. Read and count all the entities. Verify that the count matches
+   * the number of entities written.
+   */
+  @Test
+  public void testE2EV1WriteWithLargeEntities() throws Exception {
+    Pipeline p = Pipeline.create(options);
+
+    /*
+     * Datastore has a limit of 1MB per entity, and 10MB per write RPC. If each entity is around
+     * 1MB in size, then we hit the limit on the size of the write long before we hit the limit on
+     * the number of entities per writes.
+     */
+    final int rawPropertySize = 900_000;
+    final int numLargeEntities = 100;
+
+    // Write to datastore
+    p.apply(GenerateSequence.from(0).to(numLargeEntities))
+        .apply(ParDo.of(new CreateEntityFn(
+                options.getKind(), options.getNamespace(), ancestor, rawPropertySize)))
+        .apply(DatastoreIO.v1().write().withProjectId(project));
+
+    p.run();
+
+    // Count number of entities written to datastore.
+    long numEntitiesWritten = countEntities(options, project, ancestor);
+
+    assertEquals(numEntitiesWritten, numLargeEntities);
+  }
+
   @After
   public void tearDown() throws Exception {
     deleteAllEntities(options, project, ancestor);


[2/4] beam git commit: Make the large entities test neater.

Posted by jk...@apache.org.
Make the large entities test neater.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/de95c7f5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/de95c7f5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/de95c7f5

Branch: refs/heads/master
Commit: de95c7f5bf3b8391486b898b0ffb1a0a05338725
Parents: c4d14f8
Author: Colin Phipps <fi...@google.com>
Authored: Tue May 9 15:07:51 2017 +0000
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri May 19 13:11:17 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java    | 1 +
 .../apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java    | 8 +++++---
 2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/de95c7f5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 4cfb801..b198a6f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -212,6 +212,7 @@ public class DatastoreV1 {
    * exceeds this limit. This is set lower than the 10MB limit on the RPC, as this only accounts for
    * the mutations themselves and not the CommitRequest wrapper around them.
    */
+  @VisibleForTesting
   static final int DATASTORE_BATCH_UPDATE_BYTES_LIMIT = 5_000_000;
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/de95c7f5/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
index 3597b54..460049e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -26,6 +26,7 @@ import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
 import static com.google.datastore.v1.client.DatastoreHelper.makeOrder;
 import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert;
 import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DATASTORE_BATCH_UPDATE_BYTES_LIMIT;
 import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DATASTORE_BATCH_UPDATE_LIMIT;
 import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.DEFAULT_BUNDLE_SIZE_BYTES;
 import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.QUERY_BATCH_LIMIT;
@@ -60,7 +61,6 @@ import com.google.datastore.v1.Query;
 import com.google.datastore.v1.QueryResultBatch;
 import com.google.datastore.v1.RunQueryRequest;
 import com.google.datastore.v1.RunQueryResponse;
-import com.google.datastore.v1.Value;
 import com.google.datastore.v1.client.Datastore;
 import com.google.datastore.v1.client.DatastoreException;
 import com.google.datastore.v1.client.QuerySplitter;
@@ -651,9 +651,10 @@ public class DatastoreV1Test {
   @Test
   public void testDatatoreWriterFnWithLargeEntities() throws Exception {
     List<Mutation> mutations = new ArrayList<>();
+    int propertySize = 900_000;
     for (int i = 0; i < 12; ++i) {
       Entity.Builder entity = Entity.newBuilder().setKey(makeKey("key" + i, i + 1));
-      entity.putProperties("long", Value.newBuilder().setStringValue(new String(new char[1_000_000])
+      entity.putProperties("long", makeValue(new String(new char[propertySize])
             ).setExcludeFromIndexes(true).build());
       mutations.add(makeUpsert(entity.build()).build());
     }
@@ -666,9 +667,10 @@ public class DatastoreV1Test {
 
     // This test is over-specific currently; it requires that we split the 12 entity writes into 3
     // requests, but we only need each CommitRequest to be less than 10MB in size.
+    int propertiesPerRpc = DATASTORE_BATCH_UPDATE_BYTES_LIMIT / propertySize;
     int start = 0;
     while (start < mutations.size()) {
-      int end = Math.min(mutations.size(), start + 4);
+      int end = Math.min(mutations.size(), start + propertiesPerRpc);
       CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
       commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
       commitRequest.addAllMutations(mutations.subList(start, end));


[3/4] beam git commit: Comply with byte limit for Datastore Commit.

Posted by jk...@apache.org.
Comply with byte limit for Datastore Commit.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c4d14f8b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c4d14f8b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c4d14f8b

Branch: refs/heads/master
Commit: c4d14f8be7bb72fd653f1ab8e8080fc2b65f6672
Parents: ce00d24
Author: Colin Phipps <cp...@moria.org.uk>
Authored: Tue Apr 25 15:28:28 2017 +0000
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri May 19 13:11:17 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  | 16 +++++++++
 .../sdk/io/gcp/datastore/DatastoreV1Test.java   | 34 ++++++++++++++++++++
 2 files changed, 50 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c4d14f8b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 16bb1b4..4cfb801 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -208,6 +208,13 @@ public class DatastoreV1 {
   static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
 
   /**
+   * Cloud Datastore has a limit of 10MB per RPC, so we also flush if the total size of mutations
+   * exceeds this limit. This is set lower than the 10MB limit on the RPC, as this only accounts for
+   * the mutations themselves and not the CommitRequest wrapper around them.
+   */
+  static final int DATASTORE_BATCH_UPDATE_BYTES_LIMIT = 5_000_000;
+
+  /**
    * Returns an empty {@link DatastoreV1.Read} builder. Configure the source {@code projectId},
    * {@code query}, and optionally {@code namespace} and {@code numQuerySplits} using
    * {@link DatastoreV1.Read#withProjectId}, {@link DatastoreV1.Read#withQuery},
@@ -1123,6 +1130,7 @@ public class DatastoreV1 {
     private final V1DatastoreFactory datastoreFactory;
     // Current batch of mutations to be written.
     private final List<Mutation> mutations = new ArrayList<>();
+    private int mutationsSize = 0;  // Accumulated size of protos in mutations.
 
     private static final int MAX_RETRIES = 5;
     private static final FluentBackoff BUNDLE_WRITE_BACKOFF =
@@ -1152,7 +1160,14 @@ public class DatastoreV1 {
 
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
+      Mutation write = c.element();
+      int size = write.getSerializedSize();
+      if (mutations.size() > 0
+          && mutationsSize + size >= DatastoreV1.DATASTORE_BATCH_UPDATE_BYTES_LIMIT) {
+        flushBatch();
+      }
       mutations.add(c.element());
+      mutationsSize += size;
       if (mutations.size() >= DatastoreV1.DATASTORE_BATCH_UPDATE_LIMIT) {
         flushBatch();
       }
@@ -1203,6 +1218,7 @@ public class DatastoreV1 {
       }
       LOG.debug("Successfully wrote {} mutations", mutations.size());
       mutations.clear();
+      mutationsSize = 0;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/c4d14f8b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
index ba8ac84..3597b54 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -60,6 +60,7 @@ import com.google.datastore.v1.Query;
 import com.google.datastore.v1.QueryResultBatch;
 import com.google.datastore.v1.RunQueryRequest;
 import com.google.datastore.v1.RunQueryResponse;
+import com.google.datastore.v1.Value;
 import com.google.datastore.v1.client.Datastore;
 import com.google.datastore.v1.client.DatastoreException;
 import com.google.datastore.v1.client.QuerySplitter;
@@ -645,6 +646,39 @@ public class DatastoreV1Test {
   }
 
   /**
+   * Tests {@link DatastoreWriterFn} with large entities that need to be split into more batches.
+   */
+  @Test
+  public void testDatatoreWriterFnWithLargeEntities() throws Exception {
+    List<Mutation> mutations = new ArrayList<>();
+    for (int i = 0; i < 12; ++i) {
+      Entity.Builder entity = Entity.newBuilder().setKey(makeKey("key" + i, i + 1));
+      entity.putProperties("long", Value.newBuilder().setStringValue(new String(new char[1_000_000])
+            ).setExcludeFromIndexes(true).build());
+      mutations.add(makeUpsert(entity.build()).build());
+    }
+
+    DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID),
+        null, mockDatastoreFactory);
+    DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
+    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+    doFnTester.processBundle(mutations);
+
+    // This test is over-specific currently; it requires that we split the 12 entity writes into 3
+    // requests, but we only need each CommitRequest to be less than 10MB in size.
+    int start = 0;
+    while (start < mutations.size()) {
+      int end = Math.min(mutations.size(), start + 4);
+      CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+      commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+      commitRequest.addAllMutations(mutations.subList(start, end));
+      // Verify all the batch requests were made with the expected mutations.
+      verify(mockDatastore).commit(commitRequest.build());
+      start = end;
+    }
+  }
+
+  /**
    * Tests {@link DatastoreV1.Read#getEstimatedSizeBytes} to fetch and return estimated size for a
    * query.
    */


[4/4] beam git commit: This closes #2948: [BEAM-991] Comply with byte limit for Datastore Commit.

Posted by jk...@apache.org.
This closes #2948: [BEAM-991] Comply with byte limit for Datastore Commit.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9a6baefc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9a6baefc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9a6baefc

Branch: refs/heads/master
Commit: 9a6baefcd8668b145e86140f64b9d18c1c21890c
Parents: ce00d24 dcf4056
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri May 19 13:11:50 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri May 19 13:11:50 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  | 17 +++++++++
 .../sdk/io/gcp/datastore/DatastoreV1Test.java   | 36 ++++++++++++++++++++
 .../beam/sdk/io/gcp/datastore/V1ReadIT.java     |  2 +-
 .../beam/sdk/io/gcp/datastore/V1TestUtil.java   | 15 ++++++--
 .../beam/sdk/io/gcp/datastore/V1WriteIT.java    | 36 ++++++++++++++++++--
 5 files changed, 100 insertions(+), 6 deletions(-)
----------------------------------------------------------------------