You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/19 20:22:30 UTC
[1/4] beam git commit: End-to-end test for large entity writes.
Repository: beam
Updated Branches:
refs/heads/master ce00d2466 -> 9a6baefcd
End-to-end test for large entity writes.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dcf40564
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dcf40564
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dcf40564
Branch: refs/heads/master
Commit: dcf405644e2b51303e1d2c12592fe82ee01eb32f
Parents: de95c7f
Author: Colin Phipps <fi...@google.com>
Authored: Tue May 9 09:40:50 2017 +0000
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri May 19 13:11:17 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/datastore/V1ReadIT.java | 2 +-
.../beam/sdk/io/gcp/datastore/V1TestUtil.java | 15 ++++++--
.../beam/sdk/io/gcp/datastore/V1WriteIT.java | 36 ++++++++++++++++++--
3 files changed, 47 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/dcf40564/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
index ec7fa8f..22945f5 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
@@ -148,7 +148,7 @@ public class V1ReadIT {
Key ancestorKey = makeAncestorKey(options.getNamespace(), options.getKind(), ancestor);
for (long i = 0; i < numEntities; i++) {
- Entity entity = makeEntity(i, ancestorKey, options.getKind(), options.getNamespace());
+ Entity entity = makeEntity(i, ancestorKey, options.getKind(), options.getNamespace(), 0);
writer.write(entity);
}
writer.close();
http://git-wip-us.apache.org/repos/asf/beam/blob/dcf40564/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
index dc91638..5e618df 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
@@ -92,8 +92,10 @@ class V1TestUtil {
/**
* Build an entity for the given ancestorKey, kind, namespace and value.
+ * @param largePropertySize if greater than 0, add an unindexed property of the given size.
*/
- static Entity makeEntity(Long value, Key ancestorKey, String kind, @Nullable String namespace) {
+ static Entity makeEntity(Long value, Key ancestorKey, String kind, @Nullable String namespace,
+ int largePropertySize) {
Entity.Builder entityBuilder = Entity.newBuilder();
Key.Builder keyBuilder = makeKey(ancestorKey, kind, UUID.randomUUID().toString());
// NOTE: Namespace is not inherited between keys created with DatastoreHelper.makeKey, so
@@ -105,6 +107,10 @@ class V1TestUtil {
entityBuilder.setKey(keyBuilder.build());
entityBuilder.putProperties("value", makeValue(value).build());
+ if (largePropertySize > 0) {
+ entityBuilder.putProperties("unindexed_value", makeValue(new String(
+ new char[largePropertySize]).replace("\0", "A")).setExcludeFromIndexes(true).build());
+ }
return entityBuilder.build();
}
@@ -115,18 +121,21 @@ class V1TestUtil {
private final String kind;
@Nullable
private final String namespace;
+ private final int largePropertySize;
private Key ancestorKey;
- CreateEntityFn(String kind, @Nullable String namespace, String ancestor) {
+ CreateEntityFn(String kind, @Nullable String namespace, String ancestor,
+ int largePropertySize) {
this.kind = kind;
this.namespace = namespace;
+ this.largePropertySize = largePropertySize;
// Build the ancestor key for all created entities once, including the namespace.
ancestorKey = makeAncestorKey(namespace, kind, ancestor);
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
- c.output(makeEntity(c.element(), ancestorKey, kind, namespace));
+ c.output(makeEntity(c.element(), ancestorKey, kind, namespace, largePropertySize));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/dcf40564/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
index 82e4d64..4a874fd 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
@@ -67,8 +67,7 @@ public class V1WriteIT {
// Write to datastore
p.apply(GenerateSequence.from(0).to(numEntities))
- .apply(ParDo.of(new CreateEntityFn(
- options.getKind(), options.getNamespace(), ancestor)))
+ .apply(ParDo.of(new CreateEntityFn(options.getKind(), options.getNamespace(), ancestor, 0)))
.apply(DatastoreIO.v1().write().withProjectId(project));
p.run();
@@ -79,6 +78,39 @@ public class V1WriteIT {
assertEquals(numEntitiesWritten, numEntities);
}
+ /**
+ * An end-to-end test for {@link DatastoreV1.Write}.
+ *
+ * <p>Write some large test entities to Cloud Datastore, to test that a batch is flushed when
+ * the byte size limit is reached. Read and count all the entities. Verify that the count matches
+ * the number of entities written.
+ */
+ @Test
+ public void testE2EV1WriteWithLargeEntities() throws Exception {
+ Pipeline p = Pipeline.create(options);
+
+ /*
+ * Datastore has a limit of 1MB per entity, and 10MB per write RPC. If each entity is around
+ * 1MB in size, then we hit the limit on the size of the write long before we hit the limit on
+ * the number of entities per writes.
+ */
+ final int rawPropertySize = 900_000;
+ final int numLargeEntities = 100;
+
+ // Write to datastore
+ p.apply(GenerateSequence.from(0).to(numLargeEntities))
+ .apply(ParDo.of(new CreateEntityFn(
+ options.getKind(), options.getNamespace(), ancestor, rawPropertySize)))
+ .apply(DatastoreIO.v1().write().withProjectId(project));
+
+ p.run();
+
+ // Count number of entities written to datastore.
+ long numEntitiesWritten = countEntities(options, project, ancestor);
+
+ assertEquals(numEntitiesWritten, numLargeEntities);
+ }
+
@After
public void tearDown() throws Exception {
deleteAllEntities(options, project, ancestor);
[2/4] beam git commit: Make the large entities test neater.
Posted by jk...@apache.org.
Make the large entities test neater.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/de95c7f5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/de95c7f5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/de95c7f5
Branch: refs/heads/master
Commit: de95c7f5bf3b8391486b898b0ffb1a0a05338725
Parents: c4d14f8
Author: Colin Phipps <fi...@google.com>
Authored: Tue May 9 15:07:51 2017 +0000
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri May 19 13:11:17 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java | 1 +
.../apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java | 8 +++++---
2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/de95c7f5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 4cfb801..b198a6f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -212,6 +212,7 @@ public class DatastoreV1 {
* exceeds this limit. This is set lower than the 10MB limit on the RPC, as this only accounts for
* the mutations themselves and not the CommitRequest wrapper around them.
*/
+ @VisibleForTesting
static final int DATASTORE_BATCH_UPDATE_BYTES_LIMIT = 5_000_000;
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/de95c7f5/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
index 3597b54..460049e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -26,6 +26,7 @@ import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
import static com.google.datastore.v1.client.DatastoreHelper.makeOrder;
import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert;
import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DATASTORE_BATCH_UPDATE_BYTES_LIMIT;
import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DATASTORE_BATCH_UPDATE_LIMIT;
import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.DEFAULT_BUNDLE_SIZE_BYTES;
import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.QUERY_BATCH_LIMIT;
@@ -60,7 +61,6 @@ import com.google.datastore.v1.Query;
import com.google.datastore.v1.QueryResultBatch;
import com.google.datastore.v1.RunQueryRequest;
import com.google.datastore.v1.RunQueryResponse;
-import com.google.datastore.v1.Value;
import com.google.datastore.v1.client.Datastore;
import com.google.datastore.v1.client.DatastoreException;
import com.google.datastore.v1.client.QuerySplitter;
@@ -651,9 +651,10 @@ public class DatastoreV1Test {
@Test
public void testDatatoreWriterFnWithLargeEntities() throws Exception {
List<Mutation> mutations = new ArrayList<>();
+ int propertySize = 900_000;
for (int i = 0; i < 12; ++i) {
Entity.Builder entity = Entity.newBuilder().setKey(makeKey("key" + i, i + 1));
- entity.putProperties("long", Value.newBuilder().setStringValue(new String(new char[1_000_000])
+ entity.putProperties("long", makeValue(new String(new char[propertySize])
).setExcludeFromIndexes(true).build());
mutations.add(makeUpsert(entity.build()).build());
}
@@ -666,9 +667,10 @@ public class DatastoreV1Test {
// This test is over-specific currently; it requires that we split the 12 entity writes into 3
// requests, but we only need each CommitRequest to be less than 10MB in size.
+ int propertiesPerRpc = DATASTORE_BATCH_UPDATE_BYTES_LIMIT / propertySize;
int start = 0;
while (start < mutations.size()) {
- int end = Math.min(mutations.size(), start + 4);
+ int end = Math.min(mutations.size(), start + propertiesPerRpc);
CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
commitRequest.addAllMutations(mutations.subList(start, end));
[3/4] beam git commit: Comply with byte limit for Datastore Commit.
Posted by jk...@apache.org.
Comply with byte limit for Datastore Commit.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c4d14f8b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c4d14f8b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c4d14f8b
Branch: refs/heads/master
Commit: c4d14f8be7bb72fd653f1ab8e8080fc2b65f6672
Parents: ce00d24
Author: Colin Phipps <cp...@moria.org.uk>
Authored: Tue Apr 25 15:28:28 2017 +0000
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri May 19 13:11:17 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/datastore/DatastoreV1.java | 16 +++++++++
.../sdk/io/gcp/datastore/DatastoreV1Test.java | 34 ++++++++++++++++++++
2 files changed, 50 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c4d14f8b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 16bb1b4..4cfb801 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -208,6 +208,13 @@ public class DatastoreV1 {
static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
/**
+ * Cloud Datastore has a limit of 10MB per RPC, so we also flush if the total size of mutations
+ * exceeds this limit. This is set lower than the 10MB limit on the RPC, as this only accounts for
+ * the mutations themselves and not the CommitRequest wrapper around them.
+ */
+ static final int DATASTORE_BATCH_UPDATE_BYTES_LIMIT = 5_000_000;
+
+ /**
* Returns an empty {@link DatastoreV1.Read} builder. Configure the source {@code projectId},
* {@code query}, and optionally {@code namespace} and {@code numQuerySplits} using
* {@link DatastoreV1.Read#withProjectId}, {@link DatastoreV1.Read#withQuery},
@@ -1123,6 +1130,7 @@ public class DatastoreV1 {
private final V1DatastoreFactory datastoreFactory;
// Current batch of mutations to be written.
private final List<Mutation> mutations = new ArrayList<>();
+ private int mutationsSize = 0; // Accumulated size of protos in mutations.
private static final int MAX_RETRIES = 5;
private static final FluentBackoff BUNDLE_WRITE_BACKOFF =
@@ -1152,7 +1160,14 @@ public class DatastoreV1 {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
+ Mutation write = c.element();
+ int size = write.getSerializedSize();
+ if (mutations.size() > 0
+ && mutationsSize + size >= DatastoreV1.DATASTORE_BATCH_UPDATE_BYTES_LIMIT) {
+ flushBatch();
+ }
mutations.add(c.element());
+ mutationsSize += size;
if (mutations.size() >= DatastoreV1.DATASTORE_BATCH_UPDATE_LIMIT) {
flushBatch();
}
@@ -1203,6 +1218,7 @@ public class DatastoreV1 {
}
LOG.debug("Successfully wrote {} mutations", mutations.size());
mutations.clear();
+ mutationsSize = 0;
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/c4d14f8b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
index ba8ac84..3597b54 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -60,6 +60,7 @@ import com.google.datastore.v1.Query;
import com.google.datastore.v1.QueryResultBatch;
import com.google.datastore.v1.RunQueryRequest;
import com.google.datastore.v1.RunQueryResponse;
+import com.google.datastore.v1.Value;
import com.google.datastore.v1.client.Datastore;
import com.google.datastore.v1.client.DatastoreException;
import com.google.datastore.v1.client.QuerySplitter;
@@ -645,6 +646,39 @@ public class DatastoreV1Test {
}
/**
+ * Tests {@link DatastoreWriterFn} with large entities that need to be split into more batches.
+ */
+ @Test
+ public void testDatatoreWriterFnWithLargeEntities() throws Exception {
+ List<Mutation> mutations = new ArrayList<>();
+ for (int i = 0; i < 12; ++i) {
+ Entity.Builder entity = Entity.newBuilder().setKey(makeKey("key" + i, i + 1));
+ entity.putProperties("long", Value.newBuilder().setStringValue(new String(new char[1_000_000])
+ ).setExcludeFromIndexes(true).build());
+ mutations.add(makeUpsert(entity.build()).build());
+ }
+
+ DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID),
+ null, mockDatastoreFactory);
+ DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
+ doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+ doFnTester.processBundle(mutations);
+
+ // This test is over-specific currently; it requires that we split the 12 entity writes into 3
+ // requests, but we only need each CommitRequest to be less than 10MB in size.
+ int start = 0;
+ while (start < mutations.size()) {
+ int end = Math.min(mutations.size(), start + 4);
+ CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+ commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+ commitRequest.addAllMutations(mutations.subList(start, end));
+ // Verify all the batch requests were made with the expected mutations.
+ verify(mockDatastore).commit(commitRequest.build());
+ start = end;
+ }
+ }
+
+ /**
* Tests {@link DatastoreV1.Read#getEstimatedSizeBytes} to fetch and return estimated size for a
* query.
*/
[4/4] beam git commit: This closes #2948: [BEAM-991] Comply with byte
limit for Datastore Commit.
Posted by jk...@apache.org.
This closes #2948: [BEAM-991] Comply with byte limit for Datastore Commit.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9a6baefc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9a6baefc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9a6baefc
Branch: refs/heads/master
Commit: 9a6baefcd8668b145e86140f64b9d18c1c21890c
Parents: ce00d24 dcf4056
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri May 19 13:11:50 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri May 19 13:11:50 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/datastore/DatastoreV1.java | 17 +++++++++
.../sdk/io/gcp/datastore/DatastoreV1Test.java | 36 ++++++++++++++++++++
.../beam/sdk/io/gcp/datastore/V1ReadIT.java | 2 +-
.../beam/sdk/io/gcp/datastore/V1TestUtil.java | 15 ++++++--
.../beam/sdk/io/gcp/datastore/V1WriteIT.java | 36 ++++++++++++++++++--
5 files changed, 100 insertions(+), 6 deletions(-)
----------------------------------------------------------------------