You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/08/16 17:54:29 UTC

[1/2] beam git commit: Add client-side throttling.

Repository: beam
Updated Branches:
  refs/heads/master 724eda37e -> d0deb6cc8


Add client-side throttling.

The approach used is as described in
https://landing.google.com/sre/book/chapters/handling-overload.html#client-side-throttling-a7sYUg
. By backing off individual workers in response to high error rates, we relieve
pressure on the Datastore service, increasing the chance that the workload can
complete successfully. This matches the implementation in the Java SDK.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/06bd00cc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/06bd00cc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/06bd00cc

Branch: refs/heads/master
Commit: 06bd00cc228b00d42e136dc3496db21b10909f4f
Parents: 724eda3
Author: Colin Phipps <fi...@google.com>
Authored: Mon Jul 24 21:01:09 2017 +0000
Committer: chamikara@google.com <ch...@google.com>
Committed: Wed Aug 16 10:53:29 2017 -0700

----------------------------------------------------------------------
 .../io/gcp/datastore/v1/adaptive_throttler.py   | 90 +++++++++++++++++++
 .../gcp/datastore/v1/adaptive_throttler_test.py | 94 ++++++++++++++++++++
 .../io/gcp/datastore/v1/datastoreio.py          | 16 +++-
 .../apache_beam/io/gcp/datastore/v1/helper.py   | 20 ++++-
 4 files changed, 212 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/06bd00cc/sdks/python/apache_beam/io/gcp/datastore/v1/adaptive_throttler.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/adaptive_throttler.py b/sdks/python/apache_beam/io/gcp/datastore/v1/adaptive_throttler.py
new file mode 100644
index 0000000..4dfd675
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/adaptive_throttler.py
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Utility functions & classes that are _not_ specific to the datastore client.
+#
+# For internal use only; no backwards-compatibility guarantees.
+
+import random
+
+from apache_beam.io.gcp.datastore.v1 import util
+
+
+class AdaptiveThrottler(object):
+  """Implements adaptive throttling.
+
+  See
+  https://landing.google.com/sre/book/chapters/handling-overload.html#client-side-throttling-a7sYUg
+  for a full discussion of the use case and algorithm applied.
+  """
+
+  # The target minimum number of requests per samplePeriodMs, even if no
+  # requests succeed. Must be greater than 0, else we could throttle to zero.
+  # Because every decision is probabilistic, there is no guarantee that the
+  # request rate in any given interval will not be zero. (This is the +1 from
+  # the formula in
+  # https://landing.google.com/sre/book/chapters/handling-overload.html )
+  MIN_REQUESTS = 1
+
+  def __init__(self, window_ms, bucket_ms, overload_ratio):
+    """Args:
+      window_ms: int, length of history to consider, in ms, to set throttling.
+      bucket_ms: int, granularity of time buckets that we store data in, in ms.
+      overload_ratio: float, the target ratio between requests sent and
+          successful requests. This is "K" in the formula in
+          https://landing.google.com/sre/book/chapters/handling-overload.html
+    """
+    self._all_requests = util.MovingSum(window_ms, bucket_ms)
+    self._successful_requests = util.MovingSum(window_ms, bucket_ms)
+    self._overload_ratio = float(overload_ratio)
+    self._random = random.Random()
+
+  def _throttling_probability(self, now):
+    if not self._all_requests.has_data(now):
+      return 0
+    all_requests = self._all_requests.sum(now)
+    successful_requests = self._successful_requests.sum(now)
+    return max(
+        0, (all_requests - self._overload_ratio * successful_requests)
+        / (all_requests + AdaptiveThrottler.MIN_REQUESTS))
+
+  def throttle_request(self, now):
+    """Determines whether one RPC attempt should be throttled.
+
+    This should be called once each time the caller intends to send an RPC; if
+    it returns true, drop or delay that request (calling this function again
+    after the delay).
+
+    Args:
+      now: int, time in ms since the epoch
+    Returns:
+      bool, True if the caller should throttle or delay the request.
+    """
+    throttling_probability = self._throttling_probability(now)
+    self._all_requests.add(now, 1)
+    return self._random.uniform(0, 1) < throttling_probability
+
+  def successful_request(self, now):
+    """Notifies the throttler of a successful request.
+
+    Must be called once for each request (for which throttle_request was
+    previously called) that succeeded.
+
+    Args:
+      now: int, time in ms since the epoch
+    """
+    self._successful_requests.add(now, 1)

http://git-wip-us.apache.org/repos/asf/beam/blob/06bd00cc/sdks/python/apache_beam/io/gcp/datastore/v1/adaptive_throttler_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/adaptive_throttler_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/adaptive_throttler_test.py
new file mode 100644
index 0000000..93b91ad
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/adaptive_throttler_test.py
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+from mock import patch
+
+from apache_beam.io.gcp.datastore.v1.adaptive_throttler import AdaptiveThrottler
+
+
+class AdaptiveThrottlerTest(unittest.TestCase):
+
+  START_TIME = 1500000000000
+  SAMPLE_PERIOD = 60000
+  BUCKET = 1000
+  OVERLOAD_RATIO = 2
+
+  def setUp(self):
+    self._throttler = AdaptiveThrottler(
+        AdaptiveThrottlerTest.SAMPLE_PERIOD, AdaptiveThrottlerTest.BUCKET,
+        AdaptiveThrottlerTest.OVERLOAD_RATIO)
+
+  # As far as practical, keep these tests aligned with
+  # AdaptiveThrottlerTest.java.
+
+  def test_no_initial_throttling(self):
+    self.assertEqual(0, self._throttler._throttling_probability(
+        AdaptiveThrottlerTest.START_TIME))
+
+  def test_no_throttling_if_no_errors(self):
+    for t in range(AdaptiveThrottlerTest.START_TIME,
+                   AdaptiveThrottlerTest.START_TIME + 20):
+      self.assertFalse(self._throttler.throttle_request(t))
+      self._throttler.successful_request(t)
+    self.assertEqual(0, self._throttler._throttling_probability(
+        AdaptiveThrottlerTest.START_TIME + 20))
+
+  def test_no_throttling_after_errors_expire(self):
+    for t in range(AdaptiveThrottlerTest.START_TIME,
+                   AdaptiveThrottlerTest.START_TIME
+                   + AdaptiveThrottlerTest.SAMPLE_PERIOD, 100):
+      self._throttler.throttle_request(t)
+      # And no sucessful_request
+    self.assertLess(0, self._throttler._throttling_probability(
+        AdaptiveThrottlerTest.START_TIME + AdaptiveThrottlerTest.SAMPLE_PERIOD
+        ))
+    for t in range(AdaptiveThrottlerTest.START_TIME
+                   + AdaptiveThrottlerTest.SAMPLE_PERIOD,
+                   AdaptiveThrottlerTest.START_TIME
+                   + AdaptiveThrottlerTest.SAMPLE_PERIOD*2, 100):
+      self._throttler.throttle_request(t)
+      self._throttler.successful_request(t)
+
+    self.assertEqual(0, self._throttler._throttling_probability(
+        AdaptiveThrottlerTest.START_TIME +
+        AdaptiveThrottlerTest.SAMPLE_PERIOD*2))
+
+  @patch('random.Random')
+  def test_throttling_after_errors(self, mock_random):
+    mock_random().uniform.side_effect = [x/10.0 for x in range(0, 10)]*2
+    self._throttler = AdaptiveThrottler(
+        AdaptiveThrottlerTest.SAMPLE_PERIOD, AdaptiveThrottlerTest.BUCKET,
+        AdaptiveThrottlerTest.OVERLOAD_RATIO)
+    for t in range(AdaptiveThrottlerTest.START_TIME,
+                   AdaptiveThrottlerTest.START_TIME + 20):
+      throttled = self._throttler.throttle_request(t)
+      # 1/3rd of requests succeeding.
+      if t % 3 == 1:
+        self._throttler.successful_request(t)
+
+      if t > AdaptiveThrottlerTest.START_TIME + 10:
+        # Roughly 1/3rd succeeding, 1/3rd failing, 1/3rd throttled.
+        self.assertAlmostEqual(
+            0.33, self._throttler._throttling_probability(t), delta=0.1)
+        # Given the mocked random numbers, we expect 10..13 to be throttled and
+        # 14+ to be unthrottled.
+        self.assertEqual(t < AdaptiveThrottlerTest.START_TIME + 14, throttled)
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/06bd00cc/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
index 0258814..3cfba93 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
@@ -32,6 +32,7 @@ except ImportError:
 from apache_beam.io.gcp.datastore.v1 import helper
 from apache_beam.io.gcp.datastore.v1 import query_splitter
 from apache_beam.io.gcp.datastore.v1 import util
+from apache_beam.io.gcp.datastore.v1.adaptive_throttler import AdaptiveThrottler
 from apache_beam.transforms import Create
 from apache_beam.transforms import DoFn
 from apache_beam.transforms import FlatMap
@@ -402,10 +403,15 @@ class _Mutate(PTransform):
           _Mutate.DatastoreWriteFn, "datastoreRpcSuccesses")
       self._rpc_errors = Metrics.counter(
           _Mutate.DatastoreWriteFn, "datastoreRpcErrors")
+      self._throttled_secs = Metrics.counter(
+          _Mutate.DatastoreWriteFn, "cumulativeThrottlingSeconds")
+      self._throttler = AdaptiveThrottler(window_ms=120000, bucket_ms=1000,
+                                          overload_ratio=1.25)
 
-    def _update_rpc_stats(self, successes=0, errors=0):
+    def _update_rpc_stats(self, successes=0, errors=0, throttled_secs=0):
       self._rpc_successes.inc(successes)
       self._rpc_errors.inc(errors)
+      self._throttled_secs.inc(throttled_secs)
 
     def start_bundle(self):
       self._mutations = []
@@ -415,7 +421,8 @@ class _Mutate(PTransform):
         self._target_batch_size = self._fixed_batch_size
       else:
         self._batch_sizer = _Mutate._DynamicBatchSizer()
-        self._target_batch_size = self._batch_sizer.get_batch_size(time.time())
+        self._target_batch_size = self._batch_sizer.get_batch_size(
+            time.time()*1000)
 
     def process(self, element):
       size = element.ByteSize()
@@ -435,12 +442,13 @@ class _Mutate(PTransform):
       # Flush the current batch of mutations to Cloud Datastore.
       _, latency_ms = helper.write_mutations(
           self._datastore, self._project, self._mutations,
-          self._update_rpc_stats)
+          self._throttler, self._update_rpc_stats,
+          throttle_delay=_Mutate._WRITE_BATCH_TARGET_LATENCY_MS/1000)
       logging.debug("Successfully wrote %d mutations in %dms.",
                     len(self._mutations), latency_ms)
 
       if not self._fixed_batch_size:
-        now = time.time()
+        now = time.time()*1000
         self._batch_sizer.report_latency(now, latency_ms, len(self._mutations))
         self._target_batch_size = self._batch_sizer.get_batch_size(now)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/06bd00cc/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
index da14cc4..5cde255 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
@@ -22,6 +22,7 @@ For internal use only; no backwards-compatibility guarantees.
 
 import errno
 from socket import error as SocketError
+import logging
 import sys
 import time
 
@@ -167,7 +168,8 @@ def is_key_valid(key):
   return key.path[-1].HasField('id') or key.path[-1].HasField('name')
 
 
-def write_mutations(datastore, project, mutations, rpc_stats_callback=None):
+def write_mutations(datastore, project, mutations, throttler,
+                    rpc_stats_callback=None, throttle_delay=1):
   """A helper function to write a batch of mutations to Cloud Datastore.
 
   If a commit fails, it will be retried upto 5 times. All mutations in the
@@ -180,8 +182,10 @@ def write_mutations(datastore, project, mutations, rpc_stats_callback=None):
     project: str, project id
     mutations: list of google.cloud.proto.datastore.v1.datastore_pb2.Mutation
     rpc_stats_callback: a function to call with arguments `successes` and
-        `failures`; this is called to record successful and failed RPCs to
-        Datastore.
+        `failures` and `throttled_secs`; this is called to record successful
+        and failed RPCs to Datastore and time spent waiting for throttling.
+    throttler: AdaptiveThrottler, to use to select requests to be throttled.
+    throttle_delay: float, time in seconds to sleep when throttled.
 
   Returns a tuple of:
     CommitResponse, the response from Datastore;
@@ -196,12 +200,20 @@ def write_mutations(datastore, project, mutations, rpc_stats_callback=None):
   @retry.with_exponential_backoff(num_retries=5,
                                   retry_filter=retry_on_rpc_error)
   def commit(request):
+    # Client-side throttling.
+    while throttler.throttle_request(time.time()*1000):
+      logging.info("Delaying request for %ds due to previous failures",
+                   throttle_delay)
+      time.sleep(throttle_delay)
+      rpc_stats_callback(throttled_secs=throttle_delay)
+
     try:
       start_time = time.time()
       response = datastore.commit(request)
       end_time = time.time()
-      rpc_stats_callback(successes=1)
 
+      rpc_stats_callback(successes=1)
+      throttler.successful_request(start_time*1000)
       commit_time_ms = int((end_time-start_time)*1000)
       return response, commit_time_ms
     except (RPCError, SocketError):


[2/2] beam git commit: This closes #3644

Posted by ch...@apache.org.
This closes #3644


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d0deb6cc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d0deb6cc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d0deb6cc

Branch: refs/heads/master
Commit: d0deb6cc821b95aa19484a9bfc94abdace1616bd
Parents: 724eda3 06bd00c
Author: chamikara@google.com <ch...@google.com>
Authored: Wed Aug 16 10:54:05 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Wed Aug 16 10:54:05 2017 -0700

----------------------------------------------------------------------
 .../io/gcp/datastore/v1/adaptive_throttler.py   | 90 +++++++++++++++++++
 .../gcp/datastore/v1/adaptive_throttler_test.py | 94 ++++++++++++++++++++
 .../io/gcp/datastore/v1/datastoreio.py          | 16 +++-
 .../apache_beam/io/gcp/datastore/v1/helper.py   | 20 ++++-
 4 files changed, 212 insertions(+), 8 deletions(-)
----------------------------------------------------------------------