You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2021/03/01 23:12:16 UTC

[beam] branch master updated: [BEAM-11740] Estimate PCollection byte size

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d807210  [BEAM-11740] Estimate PCollection byte size
     new 9150193  Merge pull request #13924 from [BEAM-11740] Estimate PCollection byte size
d807210 is described below

commit d807210c3aa28f34c13b89f3f16bc104051532b0
Author: kileys <ki...@google.com>
AuthorDate: Tue Feb 2 20:03:47 2021 +0000

    [BEAM-11740] Estimate PCollection byte size
---
 .../beam/runners/core/metrics/LabeledMetrics.java  |   6 +
 .../beam/sdk/metrics/DelegatingDistribution.java   |  66 +++++++++++
 .../beam/fn/harness/BeamFnDataWriteRunner.java     |   4 +-
 .../org/apache/beam/fn/harness/CombineRunners.java |   4 +-
 .../org/apache/beam/fn/harness/FlattenRunner.java  |  29 ++++-
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    |   5 +-
 .../org/apache/beam/fn/harness/MapFnRunners.java   |  29 ++++-
 .../harness/data/PCollectionConsumerRegistry.java  | 119 ++++++++++++++++---
 .../beam/fn/harness/AssignWindowsRunnerTest.java   |  15 ++-
 .../beam/fn/harness/BeamFnDataReadRunnerTest.java  |   5 +-
 .../apache/beam/fn/harness/CombineRunnersTest.java | 108 ++++++++++++++---
 .../apache/beam/fn/harness/FlattenRunnerTest.java  |  47 ++++++--
 .../beam/fn/harness/FnApiDoFnRunnerTest.java       | 129 +++++++++++++++++----
 .../apache/beam/fn/harness/MapFnRunnersTest.java   |  32 +++--
 .../data/PCollectionConsumerRegistryTest.java      |  55 ++++++---
 15 files changed, 547 insertions(+), 106 deletions(-)

diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/LabeledMetrics.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/LabeledMetrics.java
index 2df2de9..f3d2793 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/LabeledMetrics.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/LabeledMetrics.java
@@ -19,7 +19,9 @@ package org.apache.beam.runners.core.metrics;
 
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.DelegatingCounter;
+import org.apache.beam.sdk.metrics.DelegatingDistribution;
 import org.apache.beam.sdk.metrics.DelegatingHistogram;
+import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.Histogram;
 import org.apache.beam.sdk.util.HistogramData;
 
@@ -40,6 +42,10 @@ public class LabeledMetrics {
     return new DelegatingCounter(metricName, processWideContainer);
   }
 
+  public static Distribution distribution(MonitoringInfoMetricName metricName) {
+    return new DelegatingDistribution(metricName);
+  }
+
   public static Histogram histogram(
       MonitoringInfoMetricName metricName,
       HistogramData.BucketType bucketType,
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingDistribution.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingDistribution.java
new file mode 100644
index 0000000..6cfe98e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingDistribution.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Internal;
+
+/**
+ * Implementation of {@link Distribution} that delegates to the instance for the current context.
+ */
+@Internal
+public class DelegatingDistribution implements Metric, Distribution, Serializable {
+  private final MetricName name;
+  private final boolean processWideContainer;
+
+  public DelegatingDistribution(MetricName name) {
+    this(name, false);
+  }
+
+  public DelegatingDistribution(MetricName name, boolean processWideContainer) {
+    this.name = name;
+    this.processWideContainer = processWideContainer;
+  }
+
+  @Override
+  public void update(long value) {
+    MetricsContainer container =
+        this.processWideContainer
+            ? MetricsEnvironment.getProcessWideContainer()
+            : MetricsEnvironment.getCurrentContainer();
+    if (container != null) {
+      container.getDistribution(name).update(value);
+    }
+  }
+
+  @Override
+  public void update(long sum, long count, long min, long max) {
+    MetricsContainer container =
+        this.processWideContainer
+            ? MetricsEnvironment.getProcessWideContainer()
+            : MetricsEnvironment.getCurrentContainer();
+    if (container != null) {
+      container.getDistribution(name).update(sum, count, min, max);
+    }
+  }
+
+  @Override
+  public MetricName getName() {
+    return name;
+  }
+}
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
index 8d16460..b15499b 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.function.ThrowingRunnable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -115,7 +116,8 @@ public class BeamFnDataWriteRunner<InputT> {
       pCollectionConsumerRegistry.register(
           getOnlyElement(pTransform.getInputsMap().values()),
           pTransformId,
-          (FnDataReceiver) (FnDataReceiver<WindowedValue<InputT>>) runner::consume);
+          (FnDataReceiver) (FnDataReceiver<WindowedValue<InputT>>) runner::consume,
+          ((WindowedValueCoder<InputT>) runner.coder).getValueCoder());
 
       finishFunctionRegistry.register(pTransformId, runner::close);
       return runner;
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
index 00f00bb..74b3a0b 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
@@ -186,8 +186,8 @@ public class CombineRunners {
       pCollectionConsumerRegistry.register(
           Iterables.getOnlyElement(pTransform.getInputsMap().values()),
           pTransformId,
-          (FnDataReceiver)
-              (FnDataReceiver<WindowedValue<KV<KeyT, InputT>>>) runner::processElement);
+          (FnDataReceiver) (FnDataReceiver<WindowedValue<KV<KeyT, InputT>>>) runner::processElement,
+          inputCoder);
       finishFunctionRegistry.register(pTransformId, runner::finishBundle);
 
       return runner;
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FlattenRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FlattenRunner.java
index b56085e..6493778 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FlattenRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FlattenRunner.java
@@ -32,13 +32,16 @@ import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
 import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Coder;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.function.ThrowingRunnable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 
 /** Executes flatten PTransforms. */
@@ -85,14 +88,36 @@ public class FlattenRunner<InputT> {
       FnDataReceiver<WindowedValue<?>> receiver =
           pCollectionConsumerRegistry.getMultiplexingConsumer(output);
 
-      FlattenRunner<InputT> runner = new FlattenRunner<>();
+      RehydratedComponents components =
+          RehydratedComponents.forComponents(Components.newBuilder().putAllCoders(coders).build());
 
+      FlattenRunner<InputT> runner = new FlattenRunner<>();
       for (String pCollectionId : pTransform.getInputsMap().values()) {
         pCollectionConsumerRegistry.register(
-            pCollectionId, pTransformId, (FnDataReceiver) receiver);
+            pCollectionId,
+            pTransformId,
+            (FnDataReceiver) receiver,
+            getValueCoder(components, pCollections, pCollectionId));
       }
 
       return runner;
     }
+
+    private org.apache.beam.sdk.coders.Coder<?> getValueCoder(
+        RehydratedComponents components,
+        Map<String, PCollection> pCollections,
+        String pCollectionId)
+        throws IOException {
+      if (!pCollections.containsKey(pCollectionId)) {
+        throw new IllegalArgumentException(
+            String.format("Missing PCollection for id: %s", pCollectionId));
+      }
+      org.apache.beam.sdk.coders.Coder<?> coder =
+          components.getCoder(pCollections.get(pCollectionId).getCoderId());
+      if (coder instanceof WindowedValueCoder) {
+        coder = ((WindowedValueCoder<InputT>) coder).getValueCoder();
+      }
+      return coder;
+    }
   }
 }
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 95ca03e..12c0acc 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -645,7 +645,10 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
         throw new IllegalStateException("Unknown urn: " + pTransform.getSpec().getUrn());
     }
     pCollectionConsumerRegistry.register(
-        pTransform.getInputsOrThrow(mainInput), pTransformId, (FnDataReceiver) mainInputConsumer);
+        pTransform.getInputsOrThrow(mainInput),
+        pTransformId,
+        (FnDataReceiver) mainInputConsumer,
+        inputCoder);
 
     this.finishBundleArgumentProvider = new FinishBundleArgumentProvider();
     switch (pTransform.getSpec().getUrn()) {
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunners.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunners.java
index 417e6d6..a5b5b6b 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunners.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunners.java
@@ -30,14 +30,17 @@ import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
 import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
 import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.function.ThrowingFunction;
 import org.apache.beam.sdk.function.ThrowingRunnable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 
 /**
@@ -129,12 +132,34 @@ public abstract class MapFnRunners {
 
       Mapper<InputT, OutputT> mapper = mapperFactory.create(pTransformId, pTransform, consumer);
 
+      RehydratedComponents components =
+          RehydratedComponents.forComponents(Components.newBuilder().putAllCoders(coders).build());
+      String pCollectionId = Iterables.getOnlyElement(pTransform.getInputsMap().values());
       pCollectionConsumerRegistry.register(
-          Iterables.getOnlyElement(pTransform.getInputsMap().values()),
+          pCollectionId,
           pTransformId,
-          (FnDataReceiver) (FnDataReceiver<WindowedValue<InputT>>) mapper::map);
+          (FnDataReceiver) (FnDataReceiver<WindowedValue<InputT>>) mapper::map,
+          getValueCoder(components, pCollections, pCollectionId));
       return mapper;
     }
+
+    private org.apache.beam.sdk.coders.Coder<?> getValueCoder(
+        RehydratedComponents components,
+        Map<String, PCollection> pCollections,
+        String pCollectionId)
+        throws IOException {
+      if (!pCollections.containsKey(pCollectionId)) {
+        throw new IllegalArgumentException(
+            String.format("Missing PCollection for id: %s", pCollectionId));
+      }
+
+      org.apache.beam.sdk.coders.Coder<?> coder =
+          components.getCoder(pCollections.get(pCollectionId).getCoderId());
+      if (coder instanceof WindowedValueCoder) {
+        coder = ((WindowedValueCoder<InputT>) coder).getValueCoder();
+      }
+      return coder;
+    }
   }
 
   @FunctionalInterface
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
index 375af89..14d245dc 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import org.apache.beam.fn.harness.HandlesSplits;
 import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
@@ -31,18 +32,24 @@ import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
 import org.apache.beam.runners.core.metrics.MonitoringInfoConstants.Labels;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants.Urns;
 import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
 import org.apache.beam.runners.core.metrics.SimpleExecutionState;
 import org.apache.beam.runners.core.metrics.SimpleStateRegistry;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ListMultimap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CountingOutputStream;
 
 /**
  * The {@code PCollectionConsumerRegistry} is used to maintain a collection of consuming
@@ -62,9 +69,12 @@ public class PCollectionConsumerRegistry {
   @SuppressWarnings({"rawtypes"})
   abstract static class ConsumerAndMetadata {
     public static ConsumerAndMetadata forConsumer(
-        FnDataReceiver consumer, String pTransformId, SimpleExecutionState state) {
+        FnDataReceiver consumer,
+        String pTransformId,
+        SimpleExecutionState state,
+        Coder valueCoder) {
       return new AutoValue_PCollectionConsumerRegistry_ConsumerAndMetadata(
-          consumer, pTransformId, state);
+          consumer, pTransformId, state, valueCoder);
     }
 
     public abstract FnDataReceiver getConsumer();
@@ -72,6 +82,8 @@ public class PCollectionConsumerRegistry {
     public abstract String getPTransformId();
 
     public abstract SimpleExecutionState getExecutionState();
+
+    public abstract Coder getValueCoder();
   }
 
   private ListMultimap<String, ConsumerAndMetadata> pCollectionIdsToConsumers;
@@ -104,7 +116,10 @@ public class PCollectionConsumerRegistry {
    *     getMultiplexingConsumer()} is called.
    */
   public <T> void register(
-      String pCollectionId, String pTransformId, FnDataReceiver<WindowedValue<T>> consumer) {
+      String pCollectionId,
+      String pTransformId,
+      FnDataReceiver<WindowedValue<T>> consumer,
+      Coder<T> valueCoder) {
     // Just save these consumers for now, but package them up later with an
     // ElementCountFnDataReceiver and possibly a MultiplexingFnDataReceiver
     // if there are multiple consumers.
@@ -124,7 +139,7 @@ public class PCollectionConsumerRegistry {
     executionStates.register(state);
 
     pCollectionIdsToConsumers.put(
-        pCollectionId, ConsumerAndMetadata.forConsumer(consumer, pTransformId, state));
+        pCollectionId, ConsumerAndMetadata.forConsumer(consumer, pTransformId, state, valueCoder));
   }
 
   /** Reset the execution states of the registered functions. */
@@ -186,8 +201,10 @@ public class PCollectionConsumerRegistry {
     private final FnDataReceiver<WindowedValue<T>> delegate;
     private final String pTransformId;
     private final SimpleExecutionState state;
-    private final Counter counter;
+    private final Counter elementCountCounter;
+    private final SampleByteSizeDistribution<T> sampledByteSizeDistribution;
     private final MetricsContainer unboundMetricContainer;
+    private final Coder<T> coder;
 
     public MetricTrackingFnDataReceiver(
         String pCollectionId, ConsumerAndMetadata consumerAndMetadata) {
@@ -196,9 +213,15 @@ public class PCollectionConsumerRegistry {
       this.pTransformId = consumerAndMetadata.getPTransformId();
       HashMap<String, String> labels = new HashMap<String, String>();
       labels.put(Labels.PCOLLECTION, pCollectionId);
-      MonitoringInfoMetricName metricName =
+      MonitoringInfoMetricName elementCountMetricName =
           MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.ELEMENT_COUNT, labels);
-      this.counter = LabeledMetrics.counter(metricName);
+      this.elementCountCounter = LabeledMetrics.counter(elementCountMetricName);
+      MonitoringInfoMetricName sampledByteSizeMetricName =
+          MonitoringInfoMetricName.named(Urns.SAMPLED_BYTE_SIZE, labels);
+      this.sampledByteSizeDistribution =
+          new SampleByteSizeDistribution<>(LabeledMetrics.distribution(sampledByteSizeMetricName));
+      this.coder = consumerAndMetadata.getValueCoder();
+
       // Collect the metric in a metric container which is not bound to the step name.
       // This is required to count elements from impulse steps, which will produce elements outside
       // of a pTransform context.
@@ -210,8 +233,9 @@ public class PCollectionConsumerRegistry {
       try (Closeable close =
           MetricsEnvironment.scopedMetricsContainer(this.unboundMetricContainer)) {
         // Increment the counter for each window the element occurs in.
-        this.counter.inc(input.getWindows().size());
-
+        this.elementCountCounter.inc(input.getWindows().size());
+        // TODO(BEAM-11879): Consider updating size per window when we have window optimization.
+        this.sampledByteSizeDistribution.tryUpdate(input.getValue(), this.coder);
         // Wrap the consumer with extra logic to set the metric container with the appropriate
         // PTransform context. This ensures that user metrics obtain the pTransform ID when they are
         // created. Also use the ExecutionStateTracker and enter an appropriate state to track the
@@ -236,7 +260,8 @@ public class PCollectionConsumerRegistry {
   private class MultiplexingMetricTrackingFnDataReceiver<T>
       implements FnDataReceiver<WindowedValue<T>> {
     private final List<ConsumerAndMetadata> consumerAndMetadatas;
-    private final Counter counter;
+    private final Counter elementCountCounter;
+    private final SampleByteSizeDistribution<T> sampledByteSizeDistribution;
     private final MetricsContainer unboundMetricContainer;
 
     public MultiplexingMetricTrackingFnDataReceiver(
@@ -244,9 +269,13 @@ public class PCollectionConsumerRegistry {
       this.consumerAndMetadatas = consumerAndMetadatas;
       HashMap<String, String> labels = new HashMap<String, String>();
       labels.put(Labels.PCOLLECTION, pCollectionId);
-      MonitoringInfoMetricName metricName =
+      MonitoringInfoMetricName elementCountMetricName =
           MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.ELEMENT_COUNT, labels);
-      this.counter = LabeledMetrics.counter(metricName);
+      this.elementCountCounter = LabeledMetrics.counter(elementCountMetricName);
+      MonitoringInfoMetricName sampledByteSizeMetricName =
+          MonitoringInfoMetricName.named(Urns.SAMPLED_BYTE_SIZE, labels);
+      this.sampledByteSizeDistribution =
+          new SampleByteSizeDistribution<>(LabeledMetrics.distribution(sampledByteSizeMetricName));
       // Collect the metric in a metric container which is not bound to the step name.
       // This is required to count elements from impulse steps, which will produce elements outside
       // of a pTransform context.
@@ -258,13 +287,18 @@ public class PCollectionConsumerRegistry {
       try (Closeable close =
           MetricsEnvironment.scopedMetricsContainer(this.unboundMetricContainer)) {
         // Increment the counter for each window the element occurs in.
-        this.counter.inc(input.getWindows().size());
-
+        this.elementCountCounter.inc(input.getWindows().size());
         // Wrap the consumer with extra logic to set the metric container with the appropriate
         // PTransform context. This ensures that user metrics obtain the pTransform ID when they are
         // created. Also use the ExecutionStateTracker and enter an appropriate state to track the
         // Process Bundle Execution time metric.
         for (ConsumerAndMetadata consumerAndMetadata : consumerAndMetadatas) {
+
+          if (consumerAndMetadata.getValueCoder() != null) {
+            // TODO(BEAM-11879): Consider updating size per window when we have window optimization.
+            this.sampledByteSizeDistribution.tryUpdate(
+                input.getValue(), consumerAndMetadata.getValueCoder());
+          }
           MetricsContainerImpl container =
               metricsContainerRegistry.getContainer(consumerAndMetadata.getPTransformId());
           try (Closeable closeable = MetricsEnvironment.scopedMetricsContainer(container)) {
@@ -305,4 +339,61 @@ public class PCollectionConsumerRegistry {
       return delegate.getProgress();
     }
   }
+
+  private static class SampleByteSizeDistribution<T> {
+    /** Basic implementation of {@link ElementByteSizeObserver} for use in size estimation. */
+    private static class ByteSizeObserver extends ElementByteSizeObserver {
+      private long observedSize = 0;
+
+      @Override
+      protected void reportElementSize(long elementSize) {
+        observedSize += elementSize;
+      }
+    }
+
+    final Distribution distribution;
+
+    public SampleByteSizeDistribution(Distribution distribution) {
+      this.distribution = distribution;
+    }
+
+    public void tryUpdate(T value, Coder<T> coder) throws Exception {
+      if (shouldSampleElement()) {
+        // First try using byte size observer
+        ByteSizeObserver observer = new ByteSizeObserver();
+        coder.registerByteSizeObserver(value, observer);
+
+        if (!observer.getIsLazy()) {
+          observer.advance();
+          this.distribution.update(observer.observedSize);
+        } else {
+          // TODO(BEAM-11841): Optimize calculation of element size for iterables.
+          // Coder byte size observation is lazy (requires iteration for observation) so fall back
+          // to counting output stream
+          CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream());
+          coder.encode(value, os);
+          this.distribution.update(os.getCount());
+        }
+      }
+    }
+
+    // Lowest sampling probability: 0.001%.
+    private static final int SAMPLING_TOKEN_UPPER_BOUND = 1000000;
+    private static final int SAMPLING_CUTOFF = 10;
+    private int samplingToken = 0;
+    private Random randomGenerator = new Random();
+
+    // TODO(BEAM-11836): Implement fast approximation for reservoir sampling.
+    private boolean shouldSampleElement() {
+      // Sampling probability decreases as the element count is increasing.
+      // We unconditionally sample the first samplingCutoff elements. For the
+      // next samplingCutoff elements, the sampling probability drops from 100%
+      // to 50%. The probability of sampling the Nth element is:
+      // min(1, samplingCutoff / N), with an additional lower bound of
+      // samplingCutoff / samplingTokenUpperBound. This algorithm may be refined
+      // later.
+      samplingToken = Math.min(samplingToken + 1, SAMPLING_TOKEN_UPPER_BOUND);
+      return randomGenerator.nextInt(samplingToken) < SAMPLING_CUTOFF;
+    }
+  }
 }
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/AssignWindowsRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/AssignWindowsRunnerTest.java
index c72e5ad..51884cb 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/AssignWindowsRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/AssignWindowsRunnerTest.java
@@ -28,9 +28,11 @@ import java.util.Collection;
 import java.util.Collections;
 import org.apache.beam.fn.harness.AssignWindowsRunner.AssignWindowsMapFnFactory;
 import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
+import org.apache.beam.runners.core.construction.CoderTranslation;
 import org.apache.beam.runners.core.construction.Environments;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.SdkComponents;
@@ -38,6 +40,7 @@ import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.function.ThrowingFunction;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -176,9 +179,13 @@ public class AssignWindowsRunnerTest implements Serializable {
     PCollectionConsumerRegistry pCollectionConsumerRegistry =
         new PCollectionConsumerRegistry(
             metricsContainerRegistry, mock(ExecutionStateTracker.class));
-    pCollectionConsumerRegistry.register("output", "ptransform", outputs::add);
+    pCollectionConsumerRegistry.register("output", "ptransform", outputs::add, VarIntCoder.of());
     SdkComponents components = SdkComponents.create();
     components.registerEnvironment(Environments.createDockerEnvironment("java"));
+    RunnerApi.PCollection pCollection =
+        RunnerApi.PCollection.newBuilder().setUniqueName("input").setCoderId("coder-id").build();
+    RunnerApi.Coder coder = CoderTranslation.toProto(VarIntCoder.of()).getCoder();
+
     MapFnRunners.forWindowedValueMapFnFactory(new AssignWindowsMapFnFactory<>())
         .createRunnerForPTransform(
             null /* pipelineOptions */,
@@ -200,9 +207,9 @@ public class AssignWindowsRunnerTest implements Serializable {
                                 .toByteString()))
                 .build(),
             null /* processBundleInstructionId */,
-            null /* pCollections */,
-            null /* coders */,
-            null /* windowingStrategies */,
+            Collections.singletonMap("input", pCollection) /* pCollections */,
+            Collections.singletonMap("coder-id", coder) /* coders */,
+            Collections.emptyMap() /* windowingStrategies */,
             pCollectionConsumerRegistry,
             null /* startFunctionRegistry */,
             null /* finishFunctionRegistry */,
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
index 81beeeb..246dd56 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
@@ -159,7 +159,8 @@ public class BeamFnDataReadRunnerTest {
       consumers.register(
           localOutputId,
           pTransformId,
-          (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) outputValues::add);
+          (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) outputValues::add,
+          StringUtf8Coder.of());
       PTransformFunctionRegistry startFunctionRegistry =
           new PTransformFunctionRegistry(
               mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
@@ -725,7 +726,7 @@ public class BeamFnDataReadRunnerTest {
         new PCollectionConsumerRegistry(
             metricsContainerRegistry, mock(ExecutionStateTracker.class));
     String localOutputId = "outputPC";
-    consumers.register(localOutputId, pTransformId, consumer);
+    consumers.register(localOutputId, pTransformId, consumer, StringUtf8Coder.of());
     PTransformFunctionRegistry startFunctionRegistry =
         new PTransformFunctionRegistry(
             mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java
index bf42b29..f21c0d7 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java
@@ -29,9 +29,12 @@ import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
 import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.ModelCoders;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
@@ -134,8 +137,8 @@ public class CombineRunnersTest {
     consumers.register(
         Iterables.getOnlyElement(pTransform.getOutputsMap().values()),
         TEST_COMBINE_ID,
-        (FnDataReceiver)
-            (FnDataReceiver<WindowedValue<KV<String, Integer>>>) mainOutputValues::add);
+        (FnDataReceiver) (FnDataReceiver<WindowedValue<KV<String, Integer>>>) mainOutputValues::add,
+        KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
 
     PTransformFunctionRegistry startFunctionRegistry =
         new PTransformFunctionRegistry(
@@ -213,9 +216,8 @@ public class CombineRunnersTest {
     consumers.register(
         Iterables.getOnlyElement(pTransform.getOutputsMap().values()),
         TEST_COMBINE_ID,
-        (FnDataReceiver)
-            (FnDataReceiver<WindowedValue<KV<String, Integer>>>) mainOutputValues::add);
-
+        (FnDataReceiver) (FnDataReceiver<WindowedValue<KV<String, Integer>>>) mainOutputValues::add,
+        KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
     PTransformFunctionRegistry startFunctionRegistry =
         new PTransformFunctionRegistry(
             mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
@@ -223,6 +225,31 @@ public class CombineRunnersTest {
         new PTransformFunctionRegistry(
             mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "finish");
 
+    // Create coder map for size estimation
+    RunnerApi.PCollection pCollection =
+        RunnerApi.PCollection.newBuilder()
+            .setUniqueName(inputPCollectionId)
+            .setCoderId("coder-id")
+            .build();
+    Map<String, RunnerApi.PCollection> pCollectionMap =
+        new HashMap<>(pProto.getComponents().getPcollectionsMap());
+    pCollectionMap.put(inputPCollectionId, pCollection);
+    Map<String, RunnerApi.Coder> coderMap = new HashMap<>(pProto.getComponents().getCodersMap());
+    coderMap.put(
+        "coder-id",
+        RunnerApi.Coder.newBuilder()
+            .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.KV_CODER_URN).build())
+            .addComponentCoderIds("StringUtf8Coder")
+            .addComponentCoderIds("coder-id-iterable")
+            .build());
+    coderMap.put(
+        "coder-id-iterable",
+        RunnerApi.Coder.newBuilder()
+            .setSpec(
+                RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.ITERABLE_CODER_URN).build())
+            .addComponentCoderIds("BigEndianIntegerCoder")
+            .build());
+
     // Create runner.
     MapFnRunners.forValueMapFnFactory(CombineRunners::createMergeAccumulatorsMapFunction)
         .createRunnerForPTransform(
@@ -233,8 +260,8 @@ public class CombineRunnersTest {
             TEST_COMBINE_ID,
             pTransform,
             null,
-            Collections.emptyMap(),
-            Collections.emptyMap(),
+            pCollectionMap,
+            coderMap,
             Collections.emptyMap(),
             consumers,
             startFunctionRegistry,
@@ -280,8 +307,8 @@ public class CombineRunnersTest {
     consumers.register(
         Iterables.getOnlyElement(pTransform.getOutputsMap().values()),
         TEST_COMBINE_ID,
-        (FnDataReceiver)
-            (FnDataReceiver<WindowedValue<KV<String, Integer>>>) mainOutputValues::add);
+        (FnDataReceiver) (FnDataReceiver<WindowedValue<KV<String, Integer>>>) mainOutputValues::add,
+        KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
 
     PTransformFunctionRegistry startFunctionRegistry =
         new PTransformFunctionRegistry(
@@ -290,6 +317,24 @@ public class CombineRunnersTest {
         new PTransformFunctionRegistry(
             mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "finish");
 
+    // Create coder map for size estimation
+    RunnerApi.PCollection pCollection =
+        RunnerApi.PCollection.newBuilder()
+            .setUniqueName(inputPCollectionId)
+            .setCoderId("coder-id")
+            .build();
+    Map<String, RunnerApi.PCollection> pCollectionMap =
+        new HashMap<>(pProto.getComponents().getPcollectionsMap());
+    pCollectionMap.put(inputPCollectionId, pCollection);
+    Map<String, RunnerApi.Coder> coderMap = new HashMap<>(pProto.getComponents().getCodersMap());
+    coderMap.put(
+        "coder-id",
+        RunnerApi.Coder.newBuilder()
+            .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.KV_CODER_URN).build())
+            .addComponentCoderIds("StringUtf8Coder")
+            .addComponentCoderIds("BigEndianIntegerCoder")
+            .build());
+
     // Create runner.
     MapFnRunners.forValueMapFnFactory(CombineRunners::createExtractOutputsMapFunction)
         .createRunnerForPTransform(
@@ -300,8 +345,8 @@ public class CombineRunnersTest {
             TEST_COMBINE_ID,
             pTransform,
             null,
-            Collections.emptyMap(),
-            Collections.emptyMap(),
+            pCollectionMap,
+            coderMap,
             Collections.emptyMap(),
             consumers,
             startFunctionRegistry,
@@ -347,8 +392,8 @@ public class CombineRunnersTest {
     consumers.register(
         Iterables.getOnlyElement(pTransform.getOutputsMap().values()),
         TEST_COMBINE_ID,
-        (FnDataReceiver)
-            (FnDataReceiver<WindowedValue<KV<String, Integer>>>) mainOutputValues::add);
+        (FnDataReceiver) (FnDataReceiver<WindowedValue<KV<String, Integer>>>) mainOutputValues::add,
+        KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
 
     PTransformFunctionRegistry startFunctionRegistry =
         new PTransformFunctionRegistry(
@@ -367,8 +412,8 @@ public class CombineRunnersTest {
             TEST_COMBINE_ID,
             pTransform,
             null,
-            Collections.emptyMap(),
-            Collections.emptyMap(),
+            pProto.getComponents().getPcollectionsMap(),
+            pProto.getComponents().getCodersMap(),
             Collections.emptyMap(),
             consumers,
             startFunctionRegistry,
@@ -413,8 +458,8 @@ public class CombineRunnersTest {
     consumers.register(
         Iterables.getOnlyElement(pTransform.getOutputsMap().values()),
         TEST_COMBINE_ID,
-        (FnDataReceiver)
-            (FnDataReceiver<WindowedValue<KV<String, Integer>>>) mainOutputValues::add);
+        (FnDataReceiver) (FnDataReceiver<WindowedValue<KV<String, Integer>>>) mainOutputValues::add,
+        KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
 
     PTransformFunctionRegistry startFunctionRegistry =
         new PTransformFunctionRegistry(
@@ -423,6 +468,31 @@ public class CombineRunnersTest {
         new PTransformFunctionRegistry(
             mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "finish");
 
+    // Create coder map for size estimation
+    RunnerApi.PCollection pCollection =
+        RunnerApi.PCollection.newBuilder()
+            .setUniqueName(inputPCollectionId)
+            .setCoderId("coder-id")
+            .build();
+    Map<String, RunnerApi.PCollection> pCollectionMap =
+        new HashMap<>(pProto.getComponents().getPcollectionsMap());
+    pCollectionMap.put(inputPCollectionId, pCollection);
+    Map<String, RunnerApi.Coder> coderMap = new HashMap<>(pProto.getComponents().getCodersMap());
+    coderMap.put(
+        "coder-id",
+        RunnerApi.Coder.newBuilder()
+            .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.KV_CODER_URN).build())
+            .addComponentCoderIds("StringUtf8Coder")
+            .addComponentCoderIds("coder-id-iterable")
+            .build());
+    coderMap.put(
+        "coder-id-iterable",
+        RunnerApi.Coder.newBuilder()
+            .setSpec(
+                RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.ITERABLE_CODER_URN).build())
+            .addComponentCoderIds("StringUtf8Coder")
+            .build());
+
     // Create runner.
     MapFnRunners.forValueMapFnFactory(CombineRunners::createCombineGroupedValuesMapFunction)
         .createRunnerForPTransform(
@@ -433,8 +503,8 @@ public class CombineRunnersTest {
             TEST_COMBINE_ID,
             pTransform,
             null,
-            Collections.emptyMap(),
-            Collections.emptyMap(),
+            pCollectionMap,
+            coderMap,
             Collections.emptyMap(),
             consumers,
             startFunctionRegistry,
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java
index 0bc5759..64e753c 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java
@@ -26,12 +26,17 @@ import static org.mockito.Mockito.mock;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.runners.core.construction.CoderTranslation;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -78,8 +83,28 @@ public class FlattenRunnerTest {
     consumers.register(
         "mainOutputTarget",
         pTransformId,
-        (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) mainOutputValues::add);
-
+        (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) mainOutputValues::add,
+        StringUtf8Coder.of());
+    Map<String, PCollection> pCollectionMap = new HashMap<>();
+    pCollectionMap.put(
+        "inputATarget",
+        RunnerApi.PCollection.newBuilder()
+            .setUniqueName("inputATarget")
+            .setCoderId("coder-id")
+            .build());
+    pCollectionMap.put(
+        "inputBTarget",
+        RunnerApi.PCollection.newBuilder()
+            .setUniqueName("inputBTarget")
+            .setCoderId("coder-id")
+            .build());
+    pCollectionMap.put(
+        "inputCTarget",
+        RunnerApi.PCollection.newBuilder()
+            .setUniqueName("inputCTarget")
+            .setCoderId("coder-id")
+            .build());
+    RunnerApi.Coder coder = CoderTranslation.toProto(StringUtf8Coder.of()).getCoder();
     new FlattenRunner.Factory<>()
         .createRunnerForPTransform(
             PipelineOptionsFactory.create(),
@@ -89,8 +114,8 @@ public class FlattenRunnerTest {
             pTransformId,
             pTransform,
             Suppliers.ofInstance("57L")::get,
-            Collections.emptyMap(),
-            Collections.emptyMap(),
+            pCollectionMap,
+            Collections.singletonMap("coder-id", coder),
             Collections.emptyMap(),
             consumers,
             null /* startFunctionRegistry */,
@@ -129,7 +154,6 @@ public class FlattenRunnerTest {
   public void testFlattenWithDuplicateInputCollectionProducesMultipleOutputs() throws Exception {
     String pTransformId = "pTransformId";
     String mainOutputId = "101";
-
     RunnerApi.FunctionSpec functionSpec =
         RunnerApi.FunctionSpec.newBuilder()
             .setUrn(PTransformTranslation.FLATTEN_TRANSFORM_URN)
@@ -150,8 +174,15 @@ public class FlattenRunnerTest {
     consumers.register(
         "mainOutputTarget",
         pTransformId,
-        (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) mainOutputValues::add);
+        (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) mainOutputValues::add,
+        StringUtf8Coder.of());
 
+    RunnerApi.PCollection pCollection =
+        RunnerApi.PCollection.newBuilder()
+            .setUniqueName("inputATarget")
+            .setCoderId("coder-id")
+            .build();
+    RunnerApi.Coder coder = CoderTranslation.toProto(StringUtf8Coder.of()).getCoder();
     new FlattenRunner.Factory<>()
         .createRunnerForPTransform(
             PipelineOptionsFactory.create(),
@@ -161,8 +192,8 @@ public class FlattenRunnerTest {
             pTransformId,
             pTransform,
             Suppliers.ofInstance("57L")::get,
-            Collections.emptyMap(),
-            Collections.emptyMap(),
+            Collections.singletonMap("inputATarget", pCollection),
+            Collections.singletonMap("coder-id", coder),
             Collections.emptyMap(),
             consumers,
             null /* startFunctionRegistry */,
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index 6db8c09..365135f 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -74,11 +74,13 @@ import org.apache.beam.runners.core.construction.RehydratedComponents;
 import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.runners.core.construction.graph.ProtoOverrides;
 import org.apache.beam.runners.core.construction.graph.SplittableParDoExpander;
+import org.apache.beam.runners.core.metrics.DistributionData;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants.Urns;
 import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
@@ -263,10 +265,12 @@ public class FnApiDoFnRunnerTest implements Serializable {
       PCollectionConsumerRegistry consumers =
           new PCollectionConsumerRegistry(
               metricsContainerRegistry, mock(ExecutionStateTracker.class));
+
       consumers.register(
           outputPCollectionId,
           TEST_TRANSFORM_ID,
-          (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) mainOutputValues::add);
+          (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) mainOutputValues::add,
+          StringUtf8Coder.of());
       PTransformFunctionRegistry startFunctionRegistry =
           new PTransformFunctionRegistry(
               mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
@@ -444,11 +448,13 @@ public class FnApiDoFnRunnerTest implements Serializable {
       consumers.register(
           outputPCollectionId,
           TEST_TRANSFORM_ID,
-          (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) mainOutputValues::add);
+          (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) mainOutputValues::add,
+          StringUtf8Coder.of());
       consumers.register(
           additionalPCollectionId,
           TEST_TRANSFORM_ID,
-          (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) additionalOutputValues::add);
+          (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) additionalOutputValues::add,
+          StringUtf8Coder.of());
       PTransformFunctionRegistry startFunctionRegistry =
           new PTransformFunctionRegistry(
               mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
@@ -570,11 +576,13 @@ public class FnApiDoFnRunnerTest implements Serializable {
       consumers.register(
           outputPCollectionId,
           TEST_TRANSFORM_ID,
-          (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) mainOutputValues::add);
+          (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) mainOutputValues::add,
+          StringUtf8Coder.of());
       consumers.register(
           additionalPCollectionId,
           TEST_TRANSFORM_ID,
-          (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) additionalOutputValues::add);
+          (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) additionalOutputValues::add,
+          StringUtf8Coder.of());
       PTransformFunctionRegistry startFunctionRegistry =
           new PTransformFunctionRegistry(
               mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
@@ -731,7 +739,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
       consumers.register(
           Iterables.getOnlyElement(pTransform.getOutputsMap().values()),
           TEST_TRANSFORM_ID,
-          (FnDataReceiver) (FnDataReceiver<WindowedValue<Iterable<String>>>) mainOutputValues::add);
+          (FnDataReceiver) (FnDataReceiver<WindowedValue<Iterable<String>>>) mainOutputValues::add,
+          IterableCoder.of(StringUtf8Coder.of()));
       PTransformFunctionRegistry startFunctionRegistry =
           new PTransformFunctionRegistry(
               mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
@@ -851,7 +860,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
       consumers.register(
           Iterables.getOnlyElement(pTransform.getOutputsMap().values()),
           TEST_TRANSFORM_ID,
-          (FnDataReceiver) (FnDataReceiver<WindowedValue<Iterable<String>>>) mainOutputValues::add);
+          (FnDataReceiver) (FnDataReceiver<WindowedValue<Iterable<String>>>) mainOutputValues::add,
+          IterableCoder.of(StringUtf8Coder.of()));
       PTransformFunctionRegistry startFunctionRegistry =
           new PTransformFunctionRegistry(
               mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
@@ -931,6 +941,21 @@ public class FnApiDoFnRunnerTest implements Serializable {
       builder.setInt64SumValue(2);
       expected.add(builder.build());
 
+      builder = new SimpleMonitoringInfoBuilder();
+      builder.setUrn(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE);
+      builder.setLabel(
+          MonitoringInfoConstants.Labels.PCOLLECTION, "Window.Into()/Window.Assign.out");
+      builder.setInt64DistributionValue(DistributionData.create(4, 2, 2, 2));
+      expected.add(builder.build());
+
+      builder = new SimpleMonitoringInfoBuilder();
+      builder.setUrn(Urns.SAMPLED_BYTE_SIZE);
+      builder.setLabel(
+          MonitoringInfoConstants.Labels.PCOLLECTION,
+          "pTransformId/ParMultiDo(TestSideInputIsAccessibleForDownstreamCallers).output");
+      builder.setInt64DistributionValue(DistributionData.create(10, 2, 5, 5));
+      expected.add(builder.build());
+
       closeable.close();
       List<MonitoringInfo> result = new ArrayList<MonitoringInfo>();
       for (MonitoringInfo mi : metricsContainerRegistry.getMonitoringInfos()) {
@@ -982,7 +1007,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
       consumers.register(
           outputPCollectionId,
           TEST_TRANSFORM_ID,
-          (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) mainOutputValues::add);
+          (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) mainOutputValues::add,
+          StringUtf8Coder.of());
 
       PTransformFunctionRegistry startFunctionRegistry =
           new PTransformFunctionRegistry(
@@ -1647,7 +1673,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
       consumers.register(
           outputPCollectionId,
           TEST_TRANSFORM_ID,
-          (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) mainOutputValues::add);
+          (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) mainOutputValues::add,
+          StringUtf8Coder.of());
       PTransformFunctionRegistry startFunctionRegistry =
           new PTransformFunctionRegistry(
               mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
@@ -1972,7 +1999,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
       consumers.register(
           outputPCollectionId,
           TEST_TRANSFORM_ID,
-          (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) mainOutputValues::add);
+          (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) mainOutputValues::add,
+          StringUtf8Coder.of());
       PTransformFunctionRegistry startFunctionRegistry =
           new PTransformFunctionRegistry(
               mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
@@ -2360,7 +2388,6 @@ public class FnApiDoFnRunnerTest implements Serializable {
           TEST_TRANSFORM_ID,
           ParDo.of(new WindowObservingTestSplittableDoFn(singletonSideInputView))
               .withSideInputs(singletonSideInputView));
-
       RunnerApi.Pipeline pProto =
           ProtoOverrides.updateTransform(
               PTransformTranslation.PAR_DO_TRANSFORM_URN,
@@ -2390,7 +2417,11 @@ public class FnApiDoFnRunnerTest implements Serializable {
       PCollectionConsumerRegistry consumers =
           new PCollectionConsumerRegistry(
               metricsContainerRegistry, mock(ExecutionStateTracker.class));
-      consumers.register(outputPCollectionId, TEST_TRANSFORM_ID, ((List) mainOutputValues)::add);
+      consumers.register(
+          outputPCollectionId,
+          TEST_TRANSFORM_ID,
+          ((List) mainOutputValues)::add,
+          KvCoder.of(StringUtf8Coder.of(), KvCoder.of(OffsetRange.Coder.of(), InstantCoder.of())));
       PTransformFunctionRegistry startFunctionRegistry =
           new PTransformFunctionRegistry(
               mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
@@ -2398,7 +2429,6 @@ public class FnApiDoFnRunnerTest implements Serializable {
           new PTransformFunctionRegistry(
               mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "finish");
       List<ThrowingRunnable> teardownFunctions = new ArrayList<>();
-
       new FnApiDoFnRunner.Factory<>()
           .createRunnerForPTransform(
               PipelineOptionsFactory.create(),
@@ -2489,7 +2519,11 @@ public class FnApiDoFnRunnerTest implements Serializable {
       PCollectionConsumerRegistry consumers =
           new PCollectionConsumerRegistry(
               metricsContainerRegistry, mock(ExecutionStateTracker.class));
-      consumers.register(outputPCollectionId, TEST_TRANSFORM_ID, ((List) mainOutputValues)::add);
+      consumers.register(
+          outputPCollectionId,
+          TEST_TRANSFORM_ID,
+          ((List) mainOutputValues)::add,
+          KvCoder.of(StringUtf8Coder.of(), KvCoder.of(OffsetRange.Coder.of(), InstantCoder.of())));
       PTransformFunctionRegistry startFunctionRegistry =
           new PTransformFunctionRegistry(
               mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
@@ -2607,7 +2641,11 @@ public class FnApiDoFnRunnerTest implements Serializable {
       PCollectionConsumerRegistry consumers =
           new PCollectionConsumerRegistry(
               metricsContainerRegistry, mock(ExecutionStateTracker.class));
-      consumers.register(outputPCollectionId, TEST_TRANSFORM_ID, ((List) mainOutputValues)::add);
+      consumers.register(
+          outputPCollectionId,
+          TEST_TRANSFORM_ID,
+          ((List) mainOutputValues)::add,
+          KvCoder.of(StringUtf8Coder.of(), KvCoder.of(OffsetRange.Coder.of(), InstantCoder.of())));
       PTransformFunctionRegistry startFunctionRegistry =
           new PTransformFunctionRegistry(
               mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
@@ -2715,7 +2753,13 @@ public class FnApiDoFnRunnerTest implements Serializable {
       PCollectionConsumerRegistry consumers =
           new PCollectionConsumerRegistry(
               metricsContainerRegistry, mock(ExecutionStateTracker.class));
-      consumers.register(outputPCollectionId, TEST_TRANSFORM_ID, ((List) mainOutputValues)::add);
+      Coder coder =
+          KvCoder.of(
+              KvCoder.of(
+                  StringUtf8Coder.of(), KvCoder.of(OffsetRange.Coder.of(), InstantCoder.of())),
+              DoubleCoder.of());
+      consumers.register(
+          outputPCollectionId, TEST_TRANSFORM_ID, ((List) mainOutputValues)::add, coder);
       PTransformFunctionRegistry startFunctionRegistry =
           new PTransformFunctionRegistry(
               mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
@@ -2828,7 +2872,13 @@ public class FnApiDoFnRunnerTest implements Serializable {
       PCollectionConsumerRegistry consumers =
           new PCollectionConsumerRegistry(
               metricsContainerRegistry, mock(ExecutionStateTracker.class));
-      consumers.register(outputPCollectionId, TEST_TRANSFORM_ID, ((List) mainOutputValues)::add);
+      Coder coder =
+          KvCoder.of(
+              KvCoder.of(
+                  StringUtf8Coder.of(), KvCoder.of(OffsetRange.Coder.of(), InstantCoder.of())),
+              DoubleCoder.of());
+      consumers.register(
+          outputPCollectionId, TEST_TRANSFORM_ID, ((List) mainOutputValues)::add, coder);
       PTransformFunctionRegistry startFunctionRegistry =
           new PTransformFunctionRegistry(
               mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
@@ -2985,7 +3035,13 @@ public class FnApiDoFnRunnerTest implements Serializable {
       PCollectionConsumerRegistry consumers =
           new PCollectionConsumerRegistry(
               metricsContainerRegistry, mock(ExecutionStateTracker.class));
-      consumers.register(outputPCollectionId, TEST_TRANSFORM_ID, ((List) mainOutputValues)::add);
+      Coder coder =
+          KvCoder.of(
+              KvCoder.of(
+                  StringUtf8Coder.of(), KvCoder.of(OffsetRange.Coder.of(), InstantCoder.of())),
+              DoubleCoder.of());
+      consumers.register(
+          outputPCollectionId, TEST_TRANSFORM_ID, ((List) mainOutputValues)::add, coder);
       PTransformFunctionRegistry startFunctionRegistry =
           new PTransformFunctionRegistry(
               mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
@@ -3194,10 +3250,16 @@ public class FnApiDoFnRunnerTest implements Serializable {
       PCollectionConsumerRegistry consumers =
           new PCollectionConsumerRegistry(
               metricsContainerRegistry, mock(ExecutionStateTracker.class));
+      Coder coder =
+          KvCoder.of(
+              KvCoder.of(
+                  StringUtf8Coder.of(), KvCoder.of(OffsetRange.Coder.of(), InstantCoder.of())),
+              DoubleCoder.of());
       consumers.register(
           outputPCollectionId,
           TEST_TRANSFORM_ID,
-          (FnDataReceiver) new SplittableFnDataReceiver(mainOutputValues));
+          (FnDataReceiver) new SplittableFnDataReceiver(mainOutputValues),
+          coder);
       PTransformFunctionRegistry startFunctionRegistry =
           new PTransformFunctionRegistry(
               mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
@@ -3363,10 +3425,13 @@ public class FnApiDoFnRunnerTest implements Serializable {
       PCollectionConsumerRegistry consumers =
           new PCollectionConsumerRegistry(
               metricsContainerRegistry, mock(ExecutionStateTracker.class));
+      Coder coder =
+          KvCoder.of(KvCoder.of(StringUtf8Coder.of(), OffsetRange.Coder.of()), DoubleCoder.of());
       consumers.register(
           outputPCollectionId,
           TEST_TRANSFORM_ID,
-          (FnDataReceiver) new SplittableFnDataReceiver(mainOutputValues));
+          (FnDataReceiver) new SplittableFnDataReceiver(mainOutputValues),
+          coder);
       PTransformFunctionRegistry startFunctionRegistry =
           new PTransformFunctionRegistry(
               mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
@@ -3440,10 +3505,16 @@ public class FnApiDoFnRunnerTest implements Serializable {
       PCollectionConsumerRegistry consumers =
           new PCollectionConsumerRegistry(
               metricsContainerRegistry, mock(ExecutionStateTracker.class));
+      Coder coder =
+          KvCoder.of(
+              KvCoder.of(
+                  StringUtf8Coder.of(), KvCoder.of(OffsetRange.Coder.of(), InstantCoder.of())),
+              DoubleCoder.of());
       consumers.register(
           outputPCollectionId,
           TEST_TRANSFORM_ID,
-          (FnDataReceiver) new SplittableFnDataReceiver(mainOutputValues));
+          (FnDataReceiver) new SplittableFnDataReceiver(mainOutputValues),
+          coder);
       PTransformFunctionRegistry startFunctionRegistry =
           new PTransformFunctionRegistry(
               mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
@@ -3554,10 +3625,16 @@ public class FnApiDoFnRunnerTest implements Serializable {
       PCollectionConsumerRegistry consumers =
           new PCollectionConsumerRegistry(
               metricsContainerRegistry, mock(ExecutionStateTracker.class));
+      Coder coder =
+          KvCoder.of(
+              KvCoder.of(
+                  StringUtf8Coder.of(), KvCoder.of(OffsetRange.Coder.of(), InstantCoder.of())),
+              DoubleCoder.of());
       consumers.register(
           outputPCollectionId,
           TEST_TRANSFORM_ID,
-          (FnDataReceiver) new SplittableFnDataReceiver(mainOutputValues));
+          (FnDataReceiver) new SplittableFnDataReceiver(mainOutputValues),
+          coder);
       PTransformFunctionRegistry startFunctionRegistry =
           new PTransformFunctionRegistry(
               mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
@@ -3690,10 +3767,16 @@ public class FnApiDoFnRunnerTest implements Serializable {
       PCollectionConsumerRegistry consumers =
           new PCollectionConsumerRegistry(
               metricsContainerRegistry, mock(ExecutionStateTracker.class));
+      Coder coder =
+          KvCoder.of(
+              KvCoder.of(
+                  StringUtf8Coder.of(), KvCoder.of(OffsetRange.Coder.of(), InstantCoder.of())),
+              DoubleCoder.of());
       consumers.register(
           outputPCollectionId,
           TEST_TRANSFORM_ID,
-          (FnDataReceiver) new SplittableFnDataReceiver(mainOutputValues));
+          (FnDataReceiver) new SplittableFnDataReceiver(mainOutputValues),
+          coder);
       PTransformFunctionRegistry startFunctionRegistry =
           new PTransformFunctionRegistry(
               mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java
index 200d640..2a83878 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java
@@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.empty;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -33,8 +34,10 @@ import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
 import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.runners.core.construction.CoderTranslation;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.function.ThrowingFunction;
 import org.apache.beam.sdk.function.ThrowingRunnable;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -62,6 +65,17 @@ public class MapFnRunnersTest {
           .putInputs("input", "inputPC")
           .putOutputs("output", "outputPC")
           .build();
+  private static final RunnerApi.PCollection INPUT_PCOLLECTION =
+      RunnerApi.PCollection.newBuilder().setUniqueName("inputPC").setCoderId("coder-id").build();
+  private static RunnerApi.Coder valueCoder;
+
+  static {
+    try {
+      valueCoder = CoderTranslation.toProto(StringUtf8Coder.of()).getCoder();
+    } catch (IOException e) {
+      throw new ExceptionInInitializerError(e);
+    }
+  }
 
   @Test
   public void testValueOnlyMapping() throws Exception {
@@ -70,7 +84,7 @@ public class MapFnRunnersTest {
     PCollectionConsumerRegistry consumers =
         new PCollectionConsumerRegistry(
             metricsContainerRegistry, mock(ExecutionStateTracker.class));
-    consumers.register("outputPC", EXPECTED_ID, outputConsumer::add);
+    consumers.register("outputPC", EXPECTED_ID, outputConsumer::add, StringUtf8Coder.of());
 
     PTransformFunctionRegistry startFunctionRegistry =
         new PTransformFunctionRegistry(
@@ -90,8 +104,8 @@ public class MapFnRunnersTest {
             EXPECTED_ID,
             EXPECTED_PTRANSFORM,
             Suppliers.ofInstance("57L")::get,
-            Collections.emptyMap(),
-            Collections.emptyMap(),
+            Collections.singletonMap("inputPC", INPUT_PCOLLECTION),
+            Collections.singletonMap("coder-id", valueCoder),
             Collections.emptyMap(),
             consumers,
             startFunctionRegistry,
@@ -120,7 +134,7 @@ public class MapFnRunnersTest {
     PCollectionConsumerRegistry consumers =
         new PCollectionConsumerRegistry(
             metricsContainerRegistry, mock(ExecutionStateTracker.class));
-    consumers.register("outputPC", EXPECTED_ID, outputConsumer::add);
+    consumers.register("outputPC", EXPECTED_ID, outputConsumer::add, StringUtf8Coder.of());
 
     PTransformFunctionRegistry startFunctionRegistry =
         new PTransformFunctionRegistry(
@@ -139,8 +153,8 @@ public class MapFnRunnersTest {
             EXPECTED_ID,
             EXPECTED_PTRANSFORM,
             Suppliers.ofInstance("57L")::get,
-            Collections.emptyMap(),
-            Collections.emptyMap(),
+            Collections.singletonMap("inputPC", INPUT_PCOLLECTION),
+            Collections.singletonMap("coder-id", valueCoder),
             Collections.emptyMap(),
             consumers,
             startFunctionRegistry,
@@ -168,7 +182,7 @@ public class MapFnRunnersTest {
     PCollectionConsumerRegistry consumers =
         new PCollectionConsumerRegistry(
             mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class));
-    consumers.register("outputPC", "pTransformId", outputConsumer::add);
+    consumers.register("outputPC", "pTransformId", outputConsumer::add, StringUtf8Coder.of());
 
     PTransformFunctionRegistry startFunctionRegistry =
         new PTransformFunctionRegistry(
@@ -187,8 +201,8 @@ public class MapFnRunnersTest {
             EXPECTED_ID,
             EXPECTED_PTRANSFORM,
             Suppliers.ofInstance("57L")::get,
-            Collections.emptyMap(),
-            Collections.emptyMap(),
+            Collections.singletonMap("inputPC", INPUT_PCOLLECTION),
+            Collections.singletonMap("coder-id", valueCoder),
             Collections.emptyMap(),
             consumers,
             startFunctionRegistry,
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistryTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistryTest.java
index cedb3fa..19e56ab 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistryTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistryTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.fn.harness.data;
 import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.any;
@@ -30,13 +31,18 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.withSettings;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.beam.fn.harness.HandlesSplits;
 import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
+import org.apache.beam.runners.core.metrics.DistributionData;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
 import org.apache.beam.runners.core.metrics.MonitoringInfoConstants.Labels;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants.Urns;
 import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -71,14 +77,14 @@ public class PCollectionConsumerRegistryTest {
             metricsContainerRegistry, mock(ExecutionStateTracker.class));
     FnDataReceiver<WindowedValue<String>> consumerA1 = mock(FnDataReceiver.class);
 
-    consumers.register(pCollectionA, pTransformIdA, consumerA1);
+    consumers.register(pCollectionA, pTransformIdA, consumerA1, StringUtf8Coder.of());
 
     FnDataReceiver<WindowedValue<String>> wrapperConsumer =
         (FnDataReceiver<WindowedValue<String>>)
             (FnDataReceiver) consumers.getMultiplexingConsumer(pCollectionA);
-
-    WindowedValue<String> element = valueInGlobalWindow("elem");
-    int numElements = 20;
+    String elementValue = "elem";
+    WindowedValue<String> element = valueInGlobalWindow(elementValue);
+    int numElements = 10;
     for (int i = 0; i < numElements; i++) {
       wrapperConsumer.accept(element);
     }
@@ -87,18 +93,29 @@ public class PCollectionConsumerRegistryTest {
     verify(consumerA1, times(numElements)).accept(element);
     assertThat(consumers.keySet(), contains(pCollectionA));
 
+    List<MonitoringInfo> expected = new ArrayList<>();
+
     SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
     builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
     builder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, pCollectionA);
     builder.setInt64SumValue(numElements);
-    MonitoringInfo expected = builder.build();
+    expected.add(builder.build());
 
+    long elementByteSize = StringUtf8Coder.of().getEncodedElementByteSize(elementValue);
+    builder = new SimpleMonitoringInfoBuilder();
+    builder.setUrn(Urns.SAMPLED_BYTE_SIZE);
+    builder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, pCollectionA);
+    builder.setInt64DistributionValue(
+        DistributionData.create(
+            numElements * elementByteSize, numElements, elementByteSize, elementByteSize));
+    expected.add(builder.build());
     // Clear the timestamp before comparison.
-    MonitoringInfo result =
-        Iterables.find(
+    Iterable<MonitoringInfo> result =
+        Iterables.filter(
             metricsContainerRegistry.getMonitoringInfos(),
             monitoringInfo -> monitoringInfo.containsLabels(Labels.PCOLLECTION));
-    assertEquals(expected, result);
+
+    assertThat(result, containsInAnyOrder(expected.toArray()));
   }
 
   @Test
@@ -113,7 +130,7 @@ public class PCollectionConsumerRegistryTest {
             metricsContainerRegistry, mock(ExecutionStateTracker.class));
     FnDataReceiver<WindowedValue<String>> consumer = mock(FnDataReceiver.class);
 
-    consumers.register(pCollectionA, pTransformId, consumer);
+    consumers.register(pCollectionA, pTransformId, consumer, StringUtf8Coder.of());
 
     FnDataReceiver<WindowedValue<String>> wrapperConsumer =
         (FnDataReceiver<WindowedValue<String>>)
@@ -142,8 +159,8 @@ public class PCollectionConsumerRegistryTest {
     FnDataReceiver<WindowedValue<String>> consumerA1 = mock(FnDataReceiver.class);
     FnDataReceiver<WindowedValue<String>> consumerA2 = mock(FnDataReceiver.class);
 
-    consumers.register(pCollectionA, pTransformIdA, consumerA1);
-    consumers.register(pCollectionA, pTransformIdB, consumerA2);
+    consumers.register(pCollectionA, pTransformIdA, consumerA1, StringUtf8Coder.of());
+    consumers.register(pCollectionA, pTransformIdB, consumerA2, StringUtf8Coder.of());
 
     FnDataReceiver<WindowedValue<String>> wrapperConsumer =
         (FnDataReceiver<WindowedValue<String>>)
@@ -187,8 +204,8 @@ public class PCollectionConsumerRegistryTest {
     FnDataReceiver<WindowedValue<String>> consumerA1 = mock(FnDataReceiver.class);
     FnDataReceiver<WindowedValue<String>> consumerA2 = mock(FnDataReceiver.class);
 
-    consumers.register(pCollectionA, pTransformId, consumerA1);
-    consumers.register(pCollectionA, pTransformId, consumerA2);
+    consumers.register(pCollectionA, pTransformId, consumerA1, StringUtf8Coder.of());
+    consumers.register(pCollectionA, pTransformId, consumerA2, StringUtf8Coder.of());
 
     FnDataReceiver<WindowedValue<String>> wrapperConsumer =
         (FnDataReceiver<WindowedValue<String>>)
@@ -212,12 +229,12 @@ public class PCollectionConsumerRegistryTest {
     FnDataReceiver<WindowedValue<String>> consumerA1 = mock(FnDataReceiver.class);
     FnDataReceiver<WindowedValue<String>> consumerA2 = mock(FnDataReceiver.class);
 
-    consumers.register(pCollectionA, pTransformId, consumerA1);
+    consumers.register(pCollectionA, pTransformId, consumerA1, StringUtf8Coder.of());
     consumers.getMultiplexingConsumer(pCollectionA);
 
     expectedException.expect(RuntimeException.class);
     expectedException.expectMessage("cannot be register()-d after");
-    consumers.register(pCollectionA, pTransformId, consumerA2);
+    consumers.register(pCollectionA, pTransformId, consumerA2, StringUtf8Coder.of());
   }
 
   @Test
@@ -232,8 +249,8 @@ public class PCollectionConsumerRegistryTest {
     FnDataReceiver<WindowedValue<String>> consumerA1 = mock(FnDataReceiver.class);
     FnDataReceiver<WindowedValue<String>> consumerA2 = mock(FnDataReceiver.class);
 
-    consumers.register("pCollectionA", "pTransformA", consumerA1);
-    consumers.register("pCollectionA", "pTransformB", consumerA2);
+    consumers.register("pCollectionA", "pTransformA", consumerA1, StringUtf8Coder.of());
+    consumers.register("pCollectionA", "pTransformB", consumerA2, StringUtf8Coder.of());
 
     FnDataReceiver<WindowedValue<String>> wrapperConsumer =
         (FnDataReceiver<WindowedValue<String>>)
@@ -264,7 +281,7 @@ public class PCollectionConsumerRegistryTest {
     FnDataReceiver<WindowedValue<String>> consumer =
         mock(FnDataReceiver.class, withSettings().verboseLogging());
 
-    consumers.register(pCollectionA, pTransformIdA, consumer);
+    consumers.register(pCollectionA, pTransformIdA, consumer, StringUtf8Coder.of());
 
     FnDataReceiver<WindowedValue<String>> wrapperConsumer =
         (FnDataReceiver<WindowedValue<String>>)
@@ -291,7 +308,7 @@ public class PCollectionConsumerRegistryTest {
             metricsContainerRegistry, mock(ExecutionStateTracker.class));
     SplittingReceiver consumerA1 = mock(SplittingReceiver.class);
 
-    consumers.register(pCollectionA, pTransformIdA, consumerA1);
+    consumers.register(pCollectionA, pTransformIdA, consumerA1, StringUtf8Coder.of());
 
     FnDataReceiver<WindowedValue<String>> wrapperConsumer =
         (FnDataReceiver<WindowedValue<String>>)