You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/19 20:22:32 UTC
[3/4] beam git commit: Comply with byte limit for Datastore Commit.
Comply with byte limit for Datastore Commit.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c4d14f8b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c4d14f8b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c4d14f8b
Branch: refs/heads/master
Commit: c4d14f8be7bb72fd653f1ab8e8080fc2b65f6672
Parents: ce00d24
Author: Colin Phipps <cp...@moria.org.uk>
Authored: Tue Apr 25 15:28:28 2017 +0000
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri May 19 13:11:17 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/datastore/DatastoreV1.java | 16 +++++++++
.../sdk/io/gcp/datastore/DatastoreV1Test.java | 34 ++++++++++++++++++++
2 files changed, 50 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c4d14f8b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 16bb1b4..4cfb801 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -208,6 +208,13 @@ public class DatastoreV1 {
static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
/**
+ * Cloud Datastore has a limit of 10MB per RPC, so we also flush if the total size of mutations
+ * exceeds this limit. This is set lower than the 10MB limit on the RPC, as this only accounts for
+ * the mutations themselves and not the CommitRequest wrapper around them.
+ */
+ static final int DATASTORE_BATCH_UPDATE_BYTES_LIMIT = 5_000_000;
+
+ /**
* Returns an empty {@link DatastoreV1.Read} builder. Configure the source {@code projectId},
* {@code query}, and optionally {@code namespace} and {@code numQuerySplits} using
* {@link DatastoreV1.Read#withProjectId}, {@link DatastoreV1.Read#withQuery},
@@ -1123,6 +1130,7 @@ public class DatastoreV1 {
private final V1DatastoreFactory datastoreFactory;
// Current batch of mutations to be written.
private final List<Mutation> mutations = new ArrayList<>();
+ private int mutationsSize = 0; // Accumulated size of protos in mutations.
private static final int MAX_RETRIES = 5;
private static final FluentBackoff BUNDLE_WRITE_BACKOFF =
@@ -1152,7 +1160,14 @@ public class DatastoreV1 {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
+ Mutation write = c.element();
+ int size = write.getSerializedSize();
+ if (mutations.size() > 0
+ && mutationsSize + size >= DatastoreV1.DATASTORE_BATCH_UPDATE_BYTES_LIMIT) {
+ flushBatch();
+ }
mutations.add(c.element());
+ mutationsSize += size;
if (mutations.size() >= DatastoreV1.DATASTORE_BATCH_UPDATE_LIMIT) {
flushBatch();
}
@@ -1203,6 +1218,7 @@ public class DatastoreV1 {
}
LOG.debug("Successfully wrote {} mutations", mutations.size());
mutations.clear();
+ mutationsSize = 0;
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/c4d14f8b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
index ba8ac84..3597b54 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -60,6 +60,7 @@ import com.google.datastore.v1.Query;
import com.google.datastore.v1.QueryResultBatch;
import com.google.datastore.v1.RunQueryRequest;
import com.google.datastore.v1.RunQueryResponse;
+import com.google.datastore.v1.Value;
import com.google.datastore.v1.client.Datastore;
import com.google.datastore.v1.client.DatastoreException;
import com.google.datastore.v1.client.QuerySplitter;
@@ -645,6 +646,39 @@ public class DatastoreV1Test {
}
/**
+ * Tests {@link DatastoreWriterFn} with large entities that need to be split into more batches.
+ */
+ @Test
+ public void testDatatoreWriterFnWithLargeEntities() throws Exception {
+ List<Mutation> mutations = new ArrayList<>();
+ for (int i = 0; i < 12; ++i) {
+ Entity.Builder entity = Entity.newBuilder().setKey(makeKey("key" + i, i + 1));
+ entity.putProperties("long", Value.newBuilder().setStringValue(new String(new char[1_000_000])
+ ).setExcludeFromIndexes(true).build());
+ mutations.add(makeUpsert(entity.build()).build());
+ }
+
+ DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID),
+ null, mockDatastoreFactory);
+ DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
+ doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+ doFnTester.processBundle(mutations);
+
+ // This test is over-specific currently; it requires that we split the 12 entity writes into 3
+ // requests, but we only need each CommitRequest to be less than 10MB in size.
+ int start = 0;
+ while (start < mutations.size()) {
+ int end = Math.min(mutations.size(), start + 4);
+ CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+ commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+ commitRequest.addAllMutations(mutations.subList(start, end));
+ // Verify all the batch requests were made with the expected mutations.
+ verify(mockDatastore).commit(commitRequest.build());
+ start = end;
+ }
+ }
+
+ /**
* Tests {@link DatastoreV1.Read#getEstimatedSizeBytes} to fetch and return estimated size for a
* query.
*/