You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2021/04/06 02:23:38 UTC
[beam] branch master updated: Optimize reservoir sampling
calculation
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6ddf0c2 Optimize reservoir sampling calculation
new bcced0c Merge pull request #14406 from [BEAM-11836] Optimize reservoir sampling calculation in PCollectionConsumerRegistry
6ddf0c2 is described below
commit 6ddf0c2ff6706883771ec9cb13309101b34b80c4
Author: kileys <ki...@google.com>
AuthorDate: Fri Apr 2 00:57:38 2021 +0000
Optimize reservoir sampling calculation
---
.../harness/data/PCollectionConsumerRegistry.java | 49 ++++++++++++++++------
1 file changed, 36 insertions(+), 13 deletions(-)
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
index 14d245dc..457cbe8 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
@@ -377,23 +377,46 @@ public class PCollectionConsumerRegistry {
}
}
- // Lowest sampling probability: 0.001%.
- private static final int SAMPLING_TOKEN_UPPER_BOUND = 1000000;
- private static final int SAMPLING_CUTOFF = 10;
- private int samplingToken = 0;
+ private static final int RESERVOIR_SIZE = 10;
+ private static final int SAMPLING_THRESHOLD = 30;
+ private long samplingToken = 0;
+ private long nextSamplingToken = 0;
private Random randomGenerator = new Random();
- // TODO(BEAM-11836): Implement fast approximation for reservoir sampling.
private boolean shouldSampleElement() {
// Sampling probability decreases as the element count is increasing.
- // We unconditionally sample the first samplingCutoff elements. For the
- // next samplingCutoff elements, the sampling probability drops from 100%
- // to 50%. The probability of sampling the Nth element is:
- // min(1, samplingCutoff / N), with an additional lower bound of
- // samplingCutoff / samplingTokenUpperBound. This algorithm may be refined
- // later.
- samplingToken = Math.min(samplingToken + 1, SAMPLING_TOKEN_UPPER_BOUND);
- return randomGenerator.nextInt(samplingToken) < SAMPLING_CUTOFF;
+ // We unconditionally sample the first samplingCutoff elements. Calculating
+ // nextInt(samplingToken) for each element is expensive, so after a threshold, calculate the
+ // gap to next sample.
+ // https://erikerlandson.github.io/blog/2015/11/20/very-fast-reservoir-sampling/
+
+ // Reset samplingToken if it's going to exceed the max value.
+ if (samplingToken + 1 == Long.MAX_VALUE) {
+ samplingToken = 0;
+ nextSamplingToken = getNextSamplingToken(samplingToken);
+ }
+
+ samplingToken++;
+ // Use traditional sampling until the threshold of 30
+ if (nextSamplingToken == 0) {
+ if (randomGenerator.nextInt((int) samplingToken) <= RESERVOIR_SIZE) {
+ if (samplingToken > SAMPLING_THRESHOLD) {
+ nextSamplingToken = getNextSamplingToken(samplingToken);
+ }
+ return true;
+ }
+ } else if (samplingToken >= nextSamplingToken) {
+ nextSamplingToken = getNextSamplingToken(samplingToken);
+ return true;
+ }
+ return false;
+ }
+
+ private long getNextSamplingToken(long samplingToken) {
+ double gap =
+ Math.log(1.0 - randomGenerator.nextDouble())
+ / Math.log(1.0 - RESERVOIR_SIZE / (double) samplingToken);
+ return samplingToken + (int) gap;
}
}
}