You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/02/18 01:41:56 UTC

[GitHub] [beam] robertwb commented on a change in pull request #13924: [BEAM-11740] Estimate PCollection byte size

robertwb commented on a change in pull request #13924:
URL: https://github.com/apache/beam/pull/13924#discussion_r578065831



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -305,4 +336,59 @@ public double getProgress() {
       return delegate.getProgress();
     }
   }
+
+  private static class SampleByteSizeDistribution<T> {
+    /** Basic implementation of {@link ElementByteSizeObserver} for use in size estimation. */
+    private static class ByteSizeObserver extends ElementByteSizeObserver {
+      private long observedSize = 0;
+
+      @Override
+      protected void reportElementSize(long elementSize) {
+        observedSize += elementSize;
+      }
+    }
+
+    final Distribution distribution;
+
+    public SampleByteSizeDistribution(Distribution distribution) {
+      this.distribution = distribution;
+    }
+
+    public void tryUpdate(T value, Coder<T> coder) throws Exception {
+      if (shouldSampleElement() || coder.isRegisterByteSizeObserverCheap(value)) {

Review comment:
       I think "cheap" is still relative, we should probably not sample every element even in that case (e.g. it could be a bunch of nested tuples).

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -305,4 +336,59 @@ public double getProgress() {
       return delegate.getProgress();
     }
   }
+
+  private static class SampleByteSizeDistribution<T> {
+    /** Basic implementation of {@link ElementByteSizeObserver} for use in size estimation. */
+    private static class ByteSizeObserver extends ElementByteSizeObserver {
+      private long observedSize = 0;
+
+      @Override
+      protected void reportElementSize(long elementSize) {
+        observedSize += elementSize;
+      }
+    }
+
+    final Distribution distribution;
+
+    public SampleByteSizeDistribution(Distribution distribution) {
+      this.distribution = distribution;
+    }
+
+    public void tryUpdate(T value, Coder<T> coder) throws Exception {
+      if (shouldSampleElement() || coder.isRegisterByteSizeObserverCheap(value)) {
+        // First try using byte size observer
+        ByteSizeObserver observer = new ByteSizeObserver();
+        coder.registerByteSizeObserver(value, observer);
+
+        if (!observer.getIsLazy()) {
+          observer.advance();
+          this.distribution.update(observer.observedSize);
+        } else {
+          // Coder byte size observation is lazy (requires iteration for observation) so fall back
+          // to counting output stream
+          CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream());
+          coder.encode(value, os);
+          this.distribution.update(os.getCount());
+        }
+      }
+    }
+
+    // Lowest sampling probability: 0.001%.
+    private static final int SAMPLING_TOKEN_UPPER_BOUND = 1000000;
+    private static final int SAMPLING_CUTOFF = 10;
+    private int samplingToken = 0;
+    private Random randomGenerator = new Random();
+
+    private boolean shouldSampleElement() {
+      // Sampling probability decreases as the element count is increasing.
+      // We unconditionally sample the first samplingCutoff elements. For the
+      // next samplingCutoff elements, the sampling probability drops from 100%
+      // to 50%. The probability of sampling the Nth element is:
+      // min(1, samplingCutoff / N), with an additional lower bound of
+      // samplingCutoff / samplingTokenUpperBound. This algorithm may be refined
+      // later.
+      samplingToken = Math.min(samplingToken + 1, SAMPLING_TOKEN_UPPER_BOUND);
+      return randomGenerator.nextInt(samplingToken) < SAMPLING_CUTOFF;

Review comment:
       Maybe drop a TODO to implement https://en.wikipedia.org/wiki/Reservoir_sampling#Fast_Approximation rather than calling nextInt every time which could be expensive. 

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -305,4 +336,59 @@ public double getProgress() {
       return delegate.getProgress();
     }
   }
+
+  private static class SampleByteSizeDistribution<T> {
+    /** Basic implementation of {@link ElementByteSizeObserver} for use in size estimation. */
+    private static class ByteSizeObserver extends ElementByteSizeObserver {
+      private long observedSize = 0;
+
+      @Override
+      protected void reportElementSize(long elementSize) {
+        observedSize += elementSize;
+      }
+    }
+
+    final Distribution distribution;
+
+    public SampleByteSizeDistribution(Distribution distribution) {
+      this.distribution = distribution;
+    }
+
+    public void tryUpdate(T value, Coder<T> coder) throws Exception {
+      if (shouldSampleElement() || coder.isRegisterByteSizeObserverCheap(value)) {
+        // First try using byte size observer
+        ByteSizeObserver observer = new ByteSizeObserver();
+        coder.registerByteSizeObserver(value, observer);
+
+        if (!observer.getIsLazy()) {
+          observer.advance();
+          this.distribution.update(observer.observedSize);
+        } else {
+          // Coder byte size observation is lazy (requires iteration for observation) so fall back

Review comment:
       So this means we'll iterate over the element twice, once here, once when it's consumed, right? We should probably at least drop a TODO to avoid this. (This iteration will also update the observer, so will be more expensive.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org