You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/02/12 01:25:00 UTC

[jira] [Work logged] (BEAM-11740) Add PCollection size estimate to Java SDK harness

     [ https://issues.apache.org/jira/browse/BEAM-11740?focusedWorklogId=551612&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-551612 ]

ASF GitHub Bot logged work on BEAM-11740:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Feb/21 01:24
            Start Date: 12/Feb/21 01:24
    Worklog Time Spent: 10m 
      Work Description: kileys commented on a change in pull request #13924:
URL: https://github.com/apache/beam/pull/13924#discussion_r574939104



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -305,4 +338,59 @@ public double getProgress() {
       return delegate.getProgress();
     }
   }
+
+  private static class SampleByteSizeEstimateDistribution<T> {
+    /** Basic implementation of {@link ElementByteSizeObserver} for use in size estimation. */
+    private static class ByteSizeObserver extends ElementByteSizeObserver {
+      private long observedSize = 0;
+
+      @Override
+      protected void reportElementSize(long elementSize) {
+        observedSize += elementSize;
+      }
+    }
+
+    final Distribution distribution;
+
+    public SampleByteSizeEstimateDistribution(Distribution distribution) {
+      this.distribution = distribution;
+    }
+
+    public void tryUpdate(T value, Coder<T> coder) throws Exception {
+      if (shouldSampleElement() || coder.isRegisterByteSizeObserverCheap(value)) {
+        // First try using byte size observer
+        ByteSizeObserver observer = new ByteSizeObserver();
+        coder.registerByteSizeObserver(value, observer);
+
+        if (!observer.getIsLazy()) {
+          observer.advance();
+          this.distribution.update(observer.observedSize);
+        } else {
+          // Coder byte size observation is lazy (requires iteration for observation) so fall back
+          // to counting output stream
+          CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream());
+          coder.encode(value, os);
+          this.distribution.update(os.getCount());
+        }
+      }
+    }
+
+    // Lowest sampling probability: 0.001%.
+    private static final int SAMPLING_TOKEN_UPPER_BOUND = 1000000;
+    private static final int SAMPLING_CUTOFF = 10;
+    private int samplingToken = 0;
+    private Random randomGenerator = new Random();
+
+    private boolean shouldSampleElement() {

Review comment:
       Copied sampling logic from legacy worker -- do we want to be able to customize or update these values?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 551612)
    Remaining Estimate: 0h
            Time Spent: 10m

> Add PCollection size estimate to Java SDK harness
> -------------------------------------------------
>
>                 Key: BEAM-11740
>                 URL: https://issues.apache.org/jira/browse/BEAM-11740
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-harness
>            Reporter: Kiley Sok
>            Assignee: Kiley Sok
>            Priority: P2
>          Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)