You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2021/04/06 02:23:38 UTC

[beam] branch master updated: Optimize reservoir sampling calculation

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ddf0c2  Optimize reservoir sampling calculation
     new bcced0c  Merge pull request #14406 from [BEAM-11836] Optimize reservoir sampling calculation in PCollectionConsumerRegistry
6ddf0c2 is described below

commit 6ddf0c2ff6706883771ec9cb13309101b34b80c4
Author: kileys <ki...@google.com>
AuthorDate: Fri Apr 2 00:57:38 2021 +0000

    Optimize reservoir sampling calculation
---
 .../harness/data/PCollectionConsumerRegistry.java  | 49 ++++++++++++++++------
 1 file changed, 36 insertions(+), 13 deletions(-)

diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
index 14d245dc..457cbe8 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
@@ -377,23 +377,46 @@ public class PCollectionConsumerRegistry {
       }
     }
 
-    // Lowest sampling probability: 0.001%.
-    private static final int SAMPLING_TOKEN_UPPER_BOUND = 1000000;
-    private static final int SAMPLING_CUTOFF = 10;
-    private int samplingToken = 0;
+    private static final int RESERVOIR_SIZE = 10;
+    private static final int SAMPLING_THRESHOLD = 30;
+    private long samplingToken = 0;
+    private long nextSamplingToken = 0;
     private Random randomGenerator = new Random();
 
-    // TODO(BEAM-11836): Implement fast approximation for reservoir sampling.
     private boolean shouldSampleElement() {
       // Sampling probability decreases as the element count is increasing.
-      // We unconditionally sample the first samplingCutoff elements. For the
-      // next samplingCutoff elements, the sampling probability drops from 100%
-      // to 50%. The probability of sampling the Nth element is:
-      // min(1, samplingCutoff / N), with an additional lower bound of
-      // samplingCutoff / samplingTokenUpperBound. This algorithm may be refined
-      // later.
-      samplingToken = Math.min(samplingToken + 1, SAMPLING_TOKEN_UPPER_BOUND);
-      return randomGenerator.nextInt(samplingToken) < SAMPLING_CUTOFF;
+      // We unconditionally sample the first samplingCutoff elements. Calculating
+      // nextInt(samplingToken) for each element is expensive, so after a threshold, calculate the
+      // gap to next sample.
+      // https://erikerlandson.github.io/blog/2015/11/20/very-fast-reservoir-sampling/
+
+      // Reset samplingToken if it's going to exceed the max value.
+      if (samplingToken + 1 == Long.MAX_VALUE) {
+        samplingToken = 0;
+        nextSamplingToken = getNextSamplingToken(samplingToken);
+      }
+
+      samplingToken++;
+      // Use traditional sampling until the threshold of 30
+      if (nextSamplingToken == 0) {
+        if (randomGenerator.nextInt((int) samplingToken) <= RESERVOIR_SIZE) {
+          if (samplingToken > SAMPLING_THRESHOLD) {
+            nextSamplingToken = getNextSamplingToken(samplingToken);
+          }
+          return true;
+        }
+      } else if (samplingToken >= nextSamplingToken) {
+        nextSamplingToken = getNextSamplingToken(samplingToken);
+        return true;
+      }
+      return false;
+    }
+
+    private long getNextSamplingToken(long samplingToken) {
+      double gap =
+          Math.log(1.0 - randomGenerator.nextDouble())
+              / Math.log(1.0 - RESERVOIR_SIZE / (double) samplingToken);
+      return samplingToken + (int) gap;
     }
   }
 }