You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/06/22 20:34:21 UTC

[1/2] beam git commit: [BEAM-2439] Dynamic sizing of Datastore write RPCs

Repository: beam
Updated Branches:
  refs/heads/master 3dc454a9b -> 9ed0af8f2


[BEAM-2439] Dynamic sizing of Datastore write RPCs

This stops the Datastore connector from always sending 500 entities per RPC.
Instead, it starts at a lower number which is more likely to complete within
the deadline even in adverse conditions, and then increases or reduces the
batch size in response to measured latency of past requests.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0292a24f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0292a24f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0292a24f

Branch: refs/heads/master
Commit: 0292a24f9c88796542bff55031d84c11f0ab6b16
Parents: 3dc454a
Author: Colin Phipps <fi...@google.com>
Authored: Mon May 15 14:18:16 2017 +0000
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Jun 22 13:33:52 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  | 124 ++++++++++++++++---
 .../sdk/io/gcp/datastore/MovingAverage.java     |  50 ++++++++
 .../sdk/io/gcp/datastore/DatastoreV1Test.java   |  72 ++++++++++-
 3 files changed, 225 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0292a24f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 06b9c8a..e67f4b2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -201,11 +201,31 @@ public class DatastoreV1 {
   DatastoreV1() {}
 
   /**
-   * Cloud Datastore has a limit of 500 mutations per batch operation, so we flush
-   * changes to Datastore every 500 entities.
+   * The number of entity updates written per RPC, initially. We buffer updates in the connector and
+   * write a batch to Datastore once we have collected a certain number. This is the initial batch
+   * size; it is adjusted at runtime based on the performance of previous writes (see {@link
+   * DatastoreV1.WriteBatcher}).
+   *
+   * <p>Testing has found that a batch of 200 entities will generally finish within the timeout even
+   * in adverse conditions.
+   */
+  @VisibleForTesting
+  static final int DATASTORE_BATCH_UPDATE_ENTITIES_START = 200;
+
+  /**
+   * When choosing the number of updates in a single RPC, never exceed the maximum allowed by the
+   * API.
    */
   @VisibleForTesting
-  static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
+  static final int DATASTORE_BATCH_UPDATE_ENTITIES_LIMIT = 500;
+
+  /**
+   * When choosing the number of updates in a single RPC, do not go below this value.  The actual
+   * number of entities per request may be lower when we flush for the end of a bundle or if we hit
+   * {@link DatastoreV1.DATASTORE_BATCH_UPDATE_BYTES_LIMIT}.
+   */
+  @VisibleForTesting
+  static final int DATASTORE_BATCH_UPDATE_ENTITIES_MIN = 10;
 
   /**
    * Cloud Datastore has a limit of 10MB per RPC, so we also flush if the total size of mutations
@@ -1107,18 +1127,74 @@ public class DatastoreV1 {
     }
   }
 
+  /** Determines batch sizes for commit RPCs. */
+  @VisibleForTesting
+  interface WriteBatcher {
+    /** Call before using this WriteBatcher. */
+    void start();
+
+    /**
+     * Reports the latency of a previous commit RPC, and the number of mutations that it contained.
+     */
+    void addRequestLatency(long timeSinceEpochMillis, long latencyMillis, int numMutations);
+
+    /** Returns the number of entities to include in the next CommitRequest. */
+    int nextBatchSize(long timeSinceEpochMillis);
+  }
+
+  /**
+   * Determines batch sizes for commit RPCs based on past performance.
+   *
+   * <p>It aims for a target response time per RPC: it uses the response times for previous RPCs
+   * and the number of entities contained in them, calculates a rolling average time-per-entity, and
+   * chooses the number of entities for future writes to hit the target time.
+   *
+   * <p>This enables us to send large batches without sending over-large requests in the case of
+   * expensive entity writes that may timeout before the server can apply them all.
+   */
+  @VisibleForTesting
+  static class WriteBatcherImpl implements WriteBatcher, Serializable {
+    /** Target time per RPC for writes. */
+    static final int DATASTORE_BATCH_TARGET_LATENCY_MS = 5000;
+
+    @Override
+    public void start() {
+      meanLatencyPerEntityMs = new MovingAverage(
+          120000 /* sample period 2 minutes */, 10000 /* sample interval 10s */,
+          1 /* numSignificantBuckets */, 1 /* numSignificantSamples */);
+    }
+
+    @Override
+    public void addRequestLatency(long timeSinceEpochMillis, long latencyMillis, int numMutations) {
+      meanLatencyPerEntityMs.add(timeSinceEpochMillis, latencyMillis / numMutations);
+    }
+
+    @Override
+    public int nextBatchSize(long timeSinceEpochMillis) {
+      if (!meanLatencyPerEntityMs.hasValue(timeSinceEpochMillis)) {
+        return DATASTORE_BATCH_UPDATE_ENTITIES_START;
+      }
+      long recentMeanLatency = Math.max(meanLatencyPerEntityMs.get(timeSinceEpochMillis), 1);
+      return (int) Math.max(DATASTORE_BATCH_UPDATE_ENTITIES_MIN,
+          Math.min(DATASTORE_BATCH_UPDATE_ENTITIES_LIMIT,
+            DATASTORE_BATCH_TARGET_LATENCY_MS / recentMeanLatency));
+    }
+
+    private transient MovingAverage meanLatencyPerEntityMs;
+  }
+
   /**
    * {@link DoFn} that writes {@link Mutation}s to Cloud Datastore. Mutations are written in
-   * batches, where the maximum batch size is {@link DatastoreV1#DATASTORE_BATCH_UPDATE_LIMIT}.
+   * batches; see {@link DatastoreV1.WriteBatcherImpl}.
    *
    * <p>See <a
    * href="https://cloud.google.com/datastore/docs/concepts/entities">
    * Datastore: Entities, Properties, and Keys</a> for information about entity keys and mutations.
    *
    * <p>Commits are non-transactional.  If a commit fails because of a conflict over an entity
-   * group, the commit will be retried (up to {@link DatastoreV1#DATASTORE_BATCH_UPDATE_LIMIT}
+   * group, the commit will be retried (up to {@link DatastoreV1.DatastoreWriterFn#MAX_RETRIES}
    * times). This means that the mutation operation should be idempotent. Thus, the writer should
-   * only be used for {code upsert} and {@code delete} mutation operations, as these are the only
+   * only be used for {@code upsert} and {@code delete} mutation operations, as these are the only
    * two Cloud Datastore mutations that are idempotent.
    */
   @VisibleForTesting
@@ -1132,6 +1208,7 @@ public class DatastoreV1 {
     // Current batch of mutations to be written.
     private final List<Mutation> mutations = new ArrayList<>();
     private int mutationsSize = 0;  // Accumulated size of protos in mutations.
+    private WriteBatcher writeBatcher;
 
     private static final int MAX_RETRIES = 5;
     private static final FluentBackoff BUNDLE_WRITE_BACKOFF =
@@ -1139,24 +1216,27 @@ public class DatastoreV1 {
             .withMaxRetries(MAX_RETRIES).withInitialBackoff(Duration.standardSeconds(5));
 
     DatastoreWriterFn(String projectId, @Nullable String localhost) {
-      this(StaticValueProvider.of(projectId), localhost, new V1DatastoreFactory());
+      this(StaticValueProvider.of(projectId), localhost, new V1DatastoreFactory(),
+          new WriteBatcherImpl());
     }
 
     DatastoreWriterFn(ValueProvider<String> projectId, @Nullable String localhost) {
-      this(projectId, localhost, new V1DatastoreFactory());
+      this(projectId, localhost, new V1DatastoreFactory(), new WriteBatcherImpl());
     }
 
     @VisibleForTesting
     DatastoreWriterFn(ValueProvider<String> projectId, @Nullable String localhost,
-        V1DatastoreFactory datastoreFactory) {
+        V1DatastoreFactory datastoreFactory, WriteBatcher writeBatcher) {
       this.projectId = checkNotNull(projectId, "projectId");
       this.localhost = localhost;
       this.datastoreFactory = datastoreFactory;
+      this.writeBatcher = writeBatcher;
     }
 
     @StartBundle
     public void startBundle(StartBundleContext c) {
       datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId.get(), localhost);
+      writeBatcher.start();
     }
 
     @ProcessElement
@@ -1169,7 +1249,7 @@ public class DatastoreV1 {
       }
       mutations.add(c.element());
       mutationsSize += size;
-      if (mutations.size() >= DatastoreV1.DATASTORE_BATCH_UPDATE_LIMIT) {
+      if (mutations.size() >= writeBatcher.nextBatchSize(System.currentTimeMillis())) {
         flushBatch();
       }
     }
@@ -1199,18 +1279,32 @@ public class DatastoreV1 {
 
       while (true) {
         // Batch upsert entities.
+        CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+        commitRequest.addAllMutations(mutations);
+        commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+        long startTime = System.currentTimeMillis(), endTime;
+
         try {
-          CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
-          commitRequest.addAllMutations(mutations);
-          commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
           datastore.commit(commitRequest.build());
+          endTime = System.currentTimeMillis();
+
+          writeBatcher.addRequestLatency(endTime, endTime - startTime, mutations.size());
+
           // Break if the commit threw no exception.
           break;
         } catch (DatastoreException exception) {
+          if (exception.getCode() == Code.DEADLINE_EXCEEDED) {
+            /* Most errors are not related to request size, and should not change our expectation of
+             * the latency of successful requests. DEADLINE_EXCEEDED can be taken into
+             * consideration, though. */
+            endTime = System.currentTimeMillis();
+            writeBatcher.addRequestLatency(endTime, endTime - startTime, mutations.size());
+          }
+
           // Only log the code and message for potentially-transient errors. The entire exception
           // will be propagated upon the last retry.
-          LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
-              exception.getMessage());
+          LOG.error("Error writing batch of {} mutations to Datastore ({}): {}", mutations.size(),
+              exception.getCode(), exception.getMessage());
           if (!BackOffUtils.next(sleeper, backoff)) {
             LOG.error("Aborting after {} retries.", MAX_RETRIES);
             throw exception;

http://git-wip-us.apache.org/repos/asf/beam/blob/0292a24f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/MovingAverage.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/MovingAverage.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/MovingAverage.java
new file mode 100644
index 0000000..0890e79
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/MovingAverage.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.util.MovingFunction;
+
+
+class MovingAverage {
+  private final MovingFunction sum;
+  private final MovingFunction count;
+
+  public MovingAverage(long samplePeriodMs, long sampleUpdateMs,
+                        int numSignificantBuckets, int numSignificantSamples) {
+    sum = new MovingFunction(samplePeriodMs, sampleUpdateMs,
+        numSignificantBuckets, numSignificantSamples, Sum.ofLongs());
+    count = new MovingFunction(samplePeriodMs, sampleUpdateMs,
+        numSignificantBuckets, numSignificantSamples, Sum.ofLongs());
+  }
+
+  public void add(long nowMsSinceEpoch, long value) {
+    sum.add(nowMsSinceEpoch, value);
+    count.add(nowMsSinceEpoch, 1);
+  }
+
+  public long get(long nowMsSinceEpoch) {
+    return sum.get(nowMsSinceEpoch) / count.get(nowMsSinceEpoch);
+  }
+
+  public boolean hasValue(long nowMsSinceEpoch) {
+    return sum.isSignificant() && count.isSignificant()
+      && count.get(nowMsSinceEpoch) > 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0292a24f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
index 229b1fb..946887c 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -27,7 +27,7 @@ import static com.google.datastore.v1.client.DatastoreHelper.makeOrder;
 import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert;
 import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
 import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DATASTORE_BATCH_UPDATE_BYTES_LIMIT;
-import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DATASTORE_BATCH_UPDATE_LIMIT;
+import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START;
 import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.DEFAULT_BUNDLE_SIZE_BYTES;
 import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.QUERY_BATCH_LIMIT;
 import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.getEstimatedSizeBytes;
@@ -606,7 +606,7 @@ public class DatastoreV1Test {
   /** Tests {@link DatastoreWriterFn} with entities of more than one batches, but not a multiple. */
   @Test
   public void testDatatoreWriterFnWithMultipleBatches() throws Exception {
-    datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 3 + 100);
+    datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_ENTITIES_START * 3 + 100);
   }
 
   /**
@@ -615,7 +615,7 @@ public class DatastoreV1Test {
    */
   @Test
   public void testDatatoreWriterFnWithBatchesExactMultiple() throws Exception {
-    datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 2);
+    datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_ENTITIES_START * 2);
   }
 
   // A helper method to test DatastoreWriterFn for various batch sizes.
@@ -628,14 +628,14 @@ public class DatastoreV1Test {
     }
 
     DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID),
-        null, mockDatastoreFactory);
+        null, mockDatastoreFactory, new FakeWriteBatcher());
     DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
     doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
     doFnTester.processBundle(mutations);
 
     int start = 0;
     while (start < numMutations) {
-      int end = Math.min(numMutations, start + DATASTORE_BATCH_UPDATE_LIMIT);
+      int end = Math.min(numMutations, start + DATASTORE_BATCH_UPDATE_ENTITIES_START);
       CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
       commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
       commitRequest.addAllMutations(mutations.subList(start, end));
@@ -662,7 +662,7 @@ public class DatastoreV1Test {
     }
 
     DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID),
-        null, mockDatastoreFactory);
+        null, mockDatastoreFactory, new FakeWriteBatcher());
     DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
     doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
     doFnTester.processBundle(mutations);
@@ -896,6 +896,50 @@ public class DatastoreV1Test {
         .apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject()));
   }
 
+  @Test
+  public void testWriteBatcherWithoutData() {
+    DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
+    writeBatcher.start();
+    assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START, writeBatcher.nextBatchSize(0));
+  }
+
+  @Test
+  public void testWriteBatcherFastQueries() {
+    DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
+    writeBatcher.start();
+    writeBatcher.addRequestLatency(0, 1000, 200);
+    writeBatcher.addRequestLatency(0, 1000, 200);
+    assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_LIMIT, writeBatcher.nextBatchSize(0));
+  }
+
+  @Test
+  public void testWriteBatcherSlowQueries() {
+    DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
+    writeBatcher.start();
+    writeBatcher.addRequestLatency(0, 10000, 200);
+    writeBatcher.addRequestLatency(0, 10000, 200);
+    assertEquals(100, writeBatcher.nextBatchSize(0));
+  }
+
+  @Test
+  public void testWriteBatcherSizeNotBelowMinimum() {
+    DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
+    writeBatcher.start();
+    writeBatcher.addRequestLatency(0, 30000, 50);
+    writeBatcher.addRequestLatency(0, 30000, 50);
+    assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_MIN, writeBatcher.nextBatchSize(0));
+  }
+
+  @Test
+  public void testWriteBatcherSlidingWindow() {
+    DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
+    writeBatcher.start();
+    writeBatcher.addRequestLatency(0, 30000, 50);
+    writeBatcher.addRequestLatency(50000, 5000, 200);
+    writeBatcher.addRequestLatency(100000, 5000, 200);
+    assertEquals(200, writeBatcher.nextBatchSize(150000));
+  }
+
   /** Helper Methods */
 
   /** A helper function that verifies if all the queries have unique keys. */
@@ -1039,4 +1083,20 @@ public class DatastoreV1Test {
     }
     return queries;
   }
+
+  /**
+   * A WriteBatcher for unit tests, which does no timing-based adjustments (so unit tests have
+   * consistent results).
+   */
+  static class FakeWriteBatcher implements DatastoreV1.WriteBatcher {
+    @Override
+    public void start() {}
+    @Override
+    public void addRequestLatency(long timeSinceEpochMillis, long latencyMillis, int numMutations) {
+    }
+    @Override
+    public int nextBatchSize(long timeSinceEpochMillis) {
+      return DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START;
+    }
+  }
 }


[2/2] beam git commit: [BEAM-2439] Dynamic sizing of Datastore write RPCs

Posted by lc...@apache.org.
[BEAM-2439] Dynamic sizing of Datastore write RPCs

This closes #3390


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9ed0af8f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9ed0af8f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9ed0af8f

Branch: refs/heads/master
Commit: 9ed0af8f2c5dabf3af5d664c473a49166b96d7d2
Parents: 3dc454a 0292a24
Author: Luke Cwik <lc...@google.com>
Authored: Thu Jun 22 13:34:12 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Jun 22 13:34:12 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  | 124 ++++++++++++++++---
 .../sdk/io/gcp/datastore/MovingAverage.java     |  50 ++++++++
 .../sdk/io/gcp/datastore/DatastoreV1Test.java   |  72 ++++++++++-
 3 files changed, 225 insertions(+), 21 deletions(-)
----------------------------------------------------------------------