You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/15 18:13:48 UTC

[beam] branch master updated: [BEAM-6138] Update java SDK to report user distribution tuple metrics over the FN API

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e41ab8  [BEAM-6138] Update java SDK to report user distribution tuple metrics over the FN API
     new 0294039  Merge pull request #8280 from ajamato/java_user_distribution
8e41ab8 is described below

commit 8e41ab88ead825a2cc30a6b2f3dabffb748ffa2a
Author: Alex Amato <aj...@google.com>
AuthorDate: Thu Apr 11 13:00:19 2019 -0700

    [BEAM-6138] Update java SDK to report user distribution tuple metrics over the FN
    API
---
 .../runners/core/metrics/MetricsContainerImpl.java | 50 ++++++++++++++-
 .../core/metrics/MonitoringInfoConstants.java      |  2 +
 .../core/metrics/SimpleMonitoringInfoBuilder.java  | 28 ++++++++-
 .../core/metrics/SpecMonitoringInfoValidator.java  |  2 +-
 .../core/metrics/MetricsContainerImplTest.java     | 56 +++++++++++++++++
 .../metrics/SimpleMonitoringInfoBuilderTest.java   | 31 +++++++++-
 .../metrics/SpecMonitoringInfoValidatorTest.java   | 15 ++++-
 .../fnexecution/control/RemoteExecutionTest.java   | 45 ++++++++++++++
 .../org/apache/beam/runners/jet/DAGBuilder.java    |  6 +-
 .../org/apache/beam/runners/jet/JetRunner.java     |  6 +-
 .../beam/runners/jet/JetTransformTranslator.java   |  6 +-
 .../beam/runners/jet/JetTransformTranslators.java  | 72 +++++++++++++++++-----
 .../beam/runners/jet/metrics/JetMetricResults.java |  3 +-
 .../runners/jet/processors/BoundedSourceP.java     |  3 +-
 .../beam/runners/jet/processors/TestStreamP.java   | 58 +++++++++--------
 .../beam/runners/jet/processors/WindowGroupP.java  |  3 +-
 16 files changed, 325 insertions(+), 61 deletions(-)

diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
index 827c9bf..a2cd286 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
@@ -27,7 +27,6 @@ import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.MetricsApi;
 import org.apache.beam.model.pipeline.v1.MetricsApi.CounterData;
-import org.apache.beam.model.pipeline.v1.MetricsApi.DistributionData;
 import org.apache.beam.model.pipeline.v1.MetricsApi.ExtremaData;
 import org.apache.beam.model.pipeline.v1.MetricsApi.IntDistributionData;
 import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
@@ -187,6 +186,45 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer {
     return builder.build();
   }
 
+  /**
+   * @param metricUpdate
+   * @return The MonitoringInfo generated from the metricUpdate.
+   */
+  @Nullable
+  private MonitoringInfo distributionUpdateToMonitoringInfo(
+      MetricUpdate<org.apache.beam.runners.core.metrics.DistributionData> metricUpdate) {
+    SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder(true);
+    MetricName metricName = metricUpdate.getKey().metricName();
+    if (metricName instanceof MonitoringInfoMetricName) {
+      MonitoringInfoMetricName monitoringInfoName = (MonitoringInfoMetricName) metricName;
+      // Represents a specific MonitoringInfo for a specific URN.
+      builder.setUrn(monitoringInfoName.getUrn());
+      for (Entry<String, String> e : monitoringInfoName.getLabels().entrySet()) {
+        builder.setLabel(e.getKey(), e.getValue());
+      }
+    } else { // Note: (metricName instanceof MetricName) is always True.
+      // Represents a user counter.
+      builder
+          .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER)
+          .setLabel(
+              MonitoringInfoConstants.Labels.NAMESPACE,
+              metricUpdate.getKey().metricName().getNamespace())
+          .setLabel(
+              MonitoringInfoConstants.Labels.NAME, metricUpdate.getKey().metricName().getName());
+
+      // Drop if the stepname is not set. All user counters must be
+      // defined for a PTransform. They must be defined on a container bound to a step.
+      if (this.stepName == null) {
+        // TODO(BEAM-7191): Consider logging a warning with a quiet logging API.
+        return null;
+      }
+      builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, metricUpdate.getKey().stepName());
+    }
+    builder.setInt64DistributionValue(metricUpdate.getUpdate());
+    builder.setTimestampToNow();
+    return builder.build();
+  }
+
   /** Return the cumulative values for any metrics in this container as MonitoringInfos. */
   public Iterable<MonitoringInfo> getMonitoringInfos() {
     // Extract user metrics and store as MonitoringInfos.
@@ -199,6 +237,14 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer {
         monitoringInfos.add(mi);
       }
     }
+
+    for (MetricUpdate<org.apache.beam.runners.core.metrics.DistributionData> metricUpdate :
+        metricUpdates.distributionUpdates()) {
+      MonitoringInfo mi = distributionUpdateToMonitoringInfo(metricUpdate);
+      if (mi != null) {
+        monitoringInfos.add(mi);
+      }
+    }
     return monitoringInfos;
   }
 
@@ -266,7 +312,7 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer {
               LOG.warn("Unsupported CounterData type: {}", counterData);
             }
           } else if (metric.hasDistributionData()) {
-            DistributionData distributionData = metric.getDistributionData();
+            MetricsApi.DistributionData distributionData = metric.getDistributionData();
             if (distributionData.hasIntDistributionData()) {
               Distribution distribution = getDistribution(metricName);
               IntDistributionData intDistributionData = distributionData.getIntDistributionData();
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
index 3716709..ecfd38d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
@@ -42,6 +42,8 @@ public final class MonitoringInfoConstants {
     public static final String USER_COUNTER = extractUrn(MonitoringInfoSpecs.Enum.USER_COUNTER);
     public static final String USER_DISTRIBUTION_COUNTER =
         extractUrn(MonitoringInfoSpecs.Enum.USER_DISTRIBUTION_COUNTER);
+    public static final String SAMPLED_BYTE_SIZE =
+        extractUrn(MonitoringInfoSpecs.Enum.SAMPLED_BYTE_SIZE);
   }
 
   /** Standardised MonitoringInfo labels that can be utilized by runners. */
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
index 23fa440..d99ee84 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
@@ -42,8 +42,9 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleF
  *
  * <p>SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
  * builder.setUrn(SimpleMonitoringInfoBuilder.ELEMENT_COUNT_URN); builder.setInt64Value(1);
- * builder.setPTransformLabel("myTransform"); builder.setPCollectionLabel("myPcollection");
- * MonitoringInfo mi = builder.build();
+ * builder.setPTransformLabel("myTransform");
+ * builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "myTransform"); MonitoringInfo mi =
+ * builder.build();
  */
 public class SimpleMonitoringInfoBuilder {
   private final boolean validateAndDropInvalid;
@@ -99,6 +100,29 @@ public class SimpleMonitoringInfoBuilder {
     return this;
   }
 
+  /**
+   * Sets the IntDistributionData of the DistributionData in the MonitoringInfo, and the appropriate
+   * type URN.
+   */
+  public SimpleMonitoringInfoBuilder setInt64DistributionValue(DistributionData data) {
+    this.builder
+        .getMetricBuilder()
+        .getDistributionDataBuilder()
+        .getIntDistributionDataBuilder()
+        .setCount(data.count())
+        .setSum(data.sum())
+        .setMin(data.min())
+        .setMax(data.max());
+    this.setInt64DistributionTypeUrn();
+    return this;
+  }
+
+  /** Sets the the appropriate type URN for int64 distribution tuples. */
+  public SimpleMonitoringInfoBuilder setInt64DistributionTypeUrn() {
+    this.builder.setType(MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64);
+    return this;
+  }
+
   /** Sets the the appropriate type URN for sum int64 counters. */
   public SimpleMonitoringInfoBuilder setInt64TypeUrn() {
     this.builder.setType(MonitoringInfoConstants.TypeUrns.SUM_INT64);
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidator.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidator.java
index 8ac8bd6..58aef95 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidator.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidator.java
@@ -50,7 +50,7 @@ public class SpecMonitoringInfoValidator {
     MonitoringInfoSpec spec = null;
 
     for (MonitoringInfoSpec specIterator : specs) {
-      if (monitoringInfo.getUrn().startsWith(specIterator.getUrn())) {
+      if (monitoringInfo.getUrn().equals(specIterator.getUrn())) {
         spec = specIterator;
         break;
       }
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
index 61e4077..c2466ad 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
@@ -183,6 +183,62 @@ public class MetricsContainerImplTest {
   }
 
   @Test
+  public void testMonitoringInfosArePopulatedForUserDistributions() {
+    MetricsContainerImpl testObject = new MetricsContainerImpl("step1");
+    DistributionCell c1 = testObject.getDistribution(MetricName.named("ns", "name1"));
+    DistributionCell c2 = testObject.getDistribution(MetricName.named("ns", "name2"));
+    c1.update(5L);
+    c2.update(4L);
+
+    SimpleMonitoringInfoBuilder builder1 = new SimpleMonitoringInfoBuilder();
+    builder1
+        .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER)
+        .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns")
+        .setLabel(MonitoringInfoConstants.Labels.NAME, "name1")
+        .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1")
+        .setInt64DistributionValue(DistributionData.create(5, 1, 5, 5));
+
+    SimpleMonitoringInfoBuilder builder2 = new SimpleMonitoringInfoBuilder();
+    builder2
+        .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER)
+        .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns")
+        .setLabel(MonitoringInfoConstants.Labels.NAME, "name2")
+        .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1")
+        .setInt64DistributionValue(DistributionData.create(4, 1, 4, 4));
+
+    ArrayList<MonitoringInfo> actualMonitoringInfos = new ArrayList<MonitoringInfo>();
+    for (MonitoringInfo mi : testObject.getMonitoringInfos()) {
+      actualMonitoringInfos.add(SimpleMonitoringInfoBuilder.copyAndClearTimestamp(mi));
+    }
+
+    assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(), builder2.build()));
+  }
+
+  @Test
+  public void testMonitoringInfosArePopulatedForSystemDistributions() {
+    MetricsContainerImpl testObject = new MetricsContainerImpl("step1");
+    HashMap<String, String> labels = new HashMap<>();
+    labels.put(MonitoringInfoConstants.Labels.PCOLLECTION, "pcoll1");
+    DistributionCell c1 =
+        testObject.getDistribution(
+            MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE, labels));
+    c1.update(5L);
+
+    SimpleMonitoringInfoBuilder builder1 = new SimpleMonitoringInfoBuilder();
+    builder1
+        .setUrn(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE)
+        .setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, "pcoll1")
+        .setInt64DistributionValue(DistributionData.create(5, 1, 5, 5));
+
+    ArrayList<MonitoringInfo> actualMonitoringInfos = new ArrayList<MonitoringInfo>();
+    for (MonitoringInfo mi : testObject.getMonitoringInfos()) {
+      actualMonitoringInfos.add(SimpleMonitoringInfoBuilder.copyAndClearTimestamp(mi));
+    }
+
+    assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build()));
+  }
+
+  @Test
   public void testMonitoringInfosArePopulatedForABeamCounter() {
     MetricsContainerImpl testObject = new MetricsContainerImpl("step1");
     HashMap<String, String> labels = new HashMap<String, String>();
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java
index ee42da4..1f4c608 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java
@@ -21,6 +21,7 @@ import static junit.framework.TestCase.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.beam.model.pipeline.v1.MetricsApi;
 import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -44,9 +45,9 @@ public class SimpleMonitoringInfoBuilderTest {
   public void testReturnsExpectedMonitoringInfo() {
     SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
     builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
+
     builder.setInt64Value(1);
     builder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, "myPcollection");
-
     // Pass now that the spec is fully met.
     MonitoringInfo monitoringInfo = builder.build();
     assertTrue(monitoringInfo != null);
@@ -60,4 +61,32 @@ public class SimpleMonitoringInfoBuilderTest {
         "myPcollection",
         monitoringInfo.getLabelsMap().get(MonitoringInfoConstants.Labels.PCOLLECTION));
   }
+
+  @Test
+  public void testUserDistribution() {
+    SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
+    builder.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER);
+    builder.setLabel(MonitoringInfoConstants.Labels.NAME, "myName");
+    builder.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "myNamespace");
+    builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "myStep");
+    assertNull(builder.build());
+
+    builder.setInt64DistributionValue(DistributionData.create(10, 2, 1, 9));
+    // Pass now that the spec is fully met.
+    MonitoringInfo monitoringInfo = builder.build();
+    assertTrue(monitoringInfo != null);
+    assertEquals(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER, monitoringInfo.getUrn());
+    assertEquals(
+        "myName", monitoringInfo.getLabelsOrDefault(MonitoringInfoConstants.Labels.NAME, ""));
+    assertEquals(
+        "myNamespace",
+        monitoringInfo.getLabelsOrDefault(MonitoringInfoConstants.Labels.NAMESPACE, ""));
+    assertEquals(MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64, monitoringInfo.getType());
+    MetricsApi.IntDistributionData distribution =
+        monitoringInfo.getMetric().getDistributionData().getIntDistributionData();
+    assertEquals(10, distribution.getSum());
+    assertEquals(2, distribution.getCount());
+    assertEquals(9, distribution.getMax());
+    assertEquals(1, distribution.getMin());
+  }
 }
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java
index a993545..6a0e936 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java
@@ -40,7 +40,9 @@ public class SpecMonitoringInfoValidatorTest {
   public void validateReturnsErrorOnInvalidMonitoringInfoType() {
     MonitoringInfo testInput =
         MonitoringInfo.newBuilder()
-            .setUrn("beam:metric:user:someCounter")
+            .setUrn("beam:metric:user")
+            .putLabels(MonitoringInfoConstants.Labels.NAME, "anyCounter")
+            .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, "namespace")
             .setType("beam:metrics:bad_value")
             .build();
     assertTrue(testObject.validate(testInput).isPresent());
@@ -60,6 +62,17 @@ public class SpecMonitoringInfoValidatorTest {
 
     testInput =
         MonitoringInfo.newBuilder()
+            .setUrn(Urns.USER_DISTRIBUTION_COUNTER)
+            .putLabels(MonitoringInfoConstants.Labels.NAME, "anyDistribution")
+            .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, "namespace")
+            .putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "anyString")
+            .setType(TypeUrns.DISTRIBUTION_INT64)
+            .putLabels("dummy", "value")
+            .build();
+    assertFalse(testObject.validate(testInput).isPresent());
+
+    testInput =
+        MonitoringInfo.newBuilder()
             .setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT)
             .setType(TypeUrns.SUM_INT64)
             .putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "value")
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 1d87dd6..3aea2b9 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -52,6 +52,7 @@ import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.core.construction.graph.FusedPipeline;
 import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.core.metrics.DistributionData;
 import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
 import org.apache.beam.runners.core.metrics.MonitoringInfoConstants.Urns;
 import org.apache.beam.runners.core.metrics.MonitoringInfoMatchers;
@@ -500,6 +501,9 @@ public class RemoteExecutionTest implements Serializable {
     final String processUserCounterName = "processUserCounter";
     final String startUserCounterName = "startUserCounter";
     final String finishUserCounterName = "finishUserCounter";
+    final String processUserDistributionName = "processUserDistribution";
+    final String startUserDistributionName = "startUserDistribution";
+    final String finishUserDistributionName = "finishUserDistribution";
     Pipeline p = Pipeline.create();
     // TODO(BEAM-6597): Remove sleeps in this test after collecting MonitoringInfos in
     // ProcessBundleProgressResponses. Use CountDownLatches to wait in start, finish and process
@@ -518,6 +522,8 @@ public class RemoteExecutionTest implements Serializable {
                       public void startBundle() throws InterruptedException {
                         Thread.sleep(1000);
                         startCounter.inc(10);
+                        Metrics.distribution(RemoteExecutionTest.class, startUserDistributionName)
+                            .update(10);
                       }
 
                       @SuppressWarnings("unused")
@@ -531,6 +537,9 @@ public class RemoteExecutionTest implements Serializable {
                           ctxt.output("two");
                           Thread.sleep(1000);
                           Metrics.counter(RemoteExecutionTest.class, processUserCounterName).inc();
+                          Metrics.distribution(
+                                  RemoteExecutionTest.class, processUserDistributionName)
+                              .update(1);
                         }
                         emitted = true;
                       }
@@ -539,6 +548,8 @@ public class RemoteExecutionTest implements Serializable {
                       public void finishBundle() throws InterruptedException {
                         Thread.sleep(1000);
                         Metrics.counter(RemoteExecutionTest.class, finishUserCounterName).inc(100);
+                        Metrics.distribution(RemoteExecutionTest.class, finishUserDistributionName)
+                            .update(100);
                       }
                     }))
             .setCoder(StringUtf8Coder.of());
@@ -624,6 +635,7 @@ public class RemoteExecutionTest implements Serializable {
           public void onCompleted(ProcessBundleResponse response) {
             List<Matcher<MonitoringInfo>> matchers = new ArrayList<Matcher<MonitoringInfo>>();
 
+            // User Counters.
             SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
             builder
                 .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
@@ -657,6 +669,39 @@ public class RemoteExecutionTest implements Serializable {
             builder.setInt64Value(100);
             matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
 
+            // User Distributions.
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName())
+                .setLabel(MonitoringInfoConstants.Labels.NAME, processUserDistributionName);
+            builder.setLabel(
+                MonitoringInfoConstants.Labels.PTRANSFORM, "create/ParMultiDo(Anonymous)");
+            builder.setInt64DistributionValue(DistributionData.create(1, 1, 1, 1));
+            matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
+
+            builder = new SimpleMonitoringInfoBuilder();
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName())
+                .setLabel(MonitoringInfoConstants.Labels.NAME, startUserDistributionName);
+            builder.setLabel(
+                MonitoringInfoConstants.Labels.PTRANSFORM, "create/ParMultiDo(Anonymous)");
+            builder.setInt64DistributionValue(DistributionData.create(10, 1, 10, 10));
+            matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
+
+            builder = new SimpleMonitoringInfoBuilder();
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName())
+                .setLabel(MonitoringInfoConstants.Labels.NAME, finishUserDistributionName);
+            builder.setLabel(
+                MonitoringInfoConstants.Labels.PTRANSFORM, "create/ParMultiDo(Anonymous)");
+            builder.setInt64DistributionValue(DistributionData.create(100, 1, 100, 100));
+            matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
+
             // The element counter should be counted only once for the pcollection.
             // So there should be only two elements.
             builder = new SimpleMonitoringInfoBuilder();
diff --git a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/DAGBuilder.java b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/DAGBuilder.java
index 7661113..3886e85 100644
--- a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/DAGBuilder.java
+++ b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/DAGBuilder.java
@@ -101,9 +101,7 @@ public class DAGBuilder {
   }
 
   Vertex addVertex(String id, SupplierEx<Processor> processor) {
-    return dag.newVertex(id, processor)
-        .localParallelism(localParallelism)
-    ;
+    return dag.newVertex(id, processor).localParallelism(localParallelism);
   }
 
   private void wireUp() {
@@ -208,7 +206,7 @@ public class DAGBuilder {
 
     @Override
     public Object applyEx(byte[] b) throws Exception {
-      Object t = CoderUtils.decodeFromByteArray(coder, b); //todo: decoding twice....
+      Object t = CoderUtils.decodeFromByteArray(coder, b); // todo: decoding twice....
       Object key = null;
       if (t instanceof WindowedValue) {
         t = ((WindowedValue) t).getValue();
diff --git a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/JetRunner.java b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/JetRunner.java
index efad9d3..562f6a4 100644
--- a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/JetRunner.java
+++ b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/JetRunner.java
@@ -87,10 +87,8 @@ public class JetRunner extends PipelineRunner<PipelineResult> {
   public PipelineResult run(Pipeline pipeline) {
     Boolean startOwnCluster = options.getJetStartOwnCluster();
     if (startOwnCluster) {
-      Collection<JetInstance> jetInstances = Arrays.asList(
-          Jet.newJetInstance(),
-          Jet.newJetInstance()
-      );
+      Collection<JetInstance> jetInstances =
+          Arrays.asList(Jet.newJetInstance(), Jet.newJetInstance());
       LOG.info("Started " + jetInstances.size() + " Jet cluster members");
     }
     try {
diff --git a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/JetTransformTranslator.java b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/JetTransformTranslator.java
index 44b543c..ed4a76f 100644
--- a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/JetTransformTranslator.java
+++ b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/JetTransformTranslator.java
@@ -25,5 +25,9 @@ import org.apache.beam.sdk.transforms.PTransform;
 
 interface JetTransformTranslator<T extends PTransform> {
 
-  Vertex translate(Pipeline pipeline, AppliedPTransform<?, ?, ?> appliedTransform, TransformHierarchy.Node node, JetTranslationContext context);
+  Vertex translate(
+      Pipeline pipeline,
+      AppliedPTransform<?, ?, ?> appliedTransform,
+      TransformHierarchy.Node node,
+      JetTranslationContext context);
 }
diff --git a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/JetTransformTranslators.java b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/JetTransformTranslators.java
index 71f1264..c672eaf 100644
--- a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/JetTransformTranslators.java
+++ b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/JetTransformTranslators.java
@@ -91,16 +91,21 @@ class JetTransformTranslators {
       implements JetTransformTranslator<PTransform<PBegin, PCollection<T>>> {
 
     @Override
-    public Vertex translate(Pipeline pipeline, AppliedPTransform<?, ?, ?> appliedTransform, Node node, JetTranslationContext context) {
+    public Vertex translate(
+        Pipeline pipeline,
+        AppliedPTransform<?, ?, ?> appliedTransform,
+        Node node,
+        JetTranslationContext context) {
       if (!Utils.isBounded(appliedTransform)) {
         throw new UnsupportedOperationException(); // todo
       }
 
       BoundedSource<T> source;
       try {
-        source = ReadTranslation.boundedSourceFromTransform(
-            (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>) appliedTransform
-        );
+        source =
+            ReadTranslation.boundedSourceFromTransform(
+                (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>)
+                    appliedTransform);
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
@@ -129,7 +134,11 @@ class JetTransformTranslators {
       implements JetTransformTranslator<PTransform<PCollection, PCollectionTuple>> {
 
     @Override
-    public Vertex translate(Pipeline pipeline, AppliedPTransform<?, ?, ?> appliedTransform, Node node, JetTranslationContext context) {
+    public Vertex translate(
+        Pipeline pipeline,
+        AppliedPTransform<?, ?, ?> appliedTransform,
+        Node node,
+        JetTranslationContext context) {
       boolean usesStateOrTimers = Utils.usesStateOrTimers(appliedTransform);
       DoFn<?, ?> doFn = Utils.getDoFn(appliedTransform);
 
@@ -234,7 +243,11 @@ class JetTransformTranslators {
           PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>>> {
 
     @Override
-    public Vertex translate(Pipeline pipeline, AppliedPTransform<?, ?, ?> appliedTransform, Node node, JetTranslationContext context) {
+    public Vertex translate(
+        Pipeline pipeline,
+        AppliedPTransform<?, ?, ?> appliedTransform,
+        Node node,
+        JetTranslationContext context) {
       String transformName = appliedTransform.getFullName();
 
       PCollection<KV<K, InputT>> input = Utils.getInput(appliedTransform);
@@ -271,12 +284,18 @@ class JetTransformTranslators {
       implements JetTransformTranslator<PTransform<PCollection<T>, PCollection<T>>> {
 
     @Override
-    public Vertex translate(Pipeline pipeline, AppliedPTransform<?, ?, ?> appliedTransform, Node node, JetTranslationContext context) {
+    public Vertex translate(
+        Pipeline pipeline,
+        AppliedPTransform<?, ?, ?> appliedTransform,
+        Node node,
+        JetTranslationContext context) {
       PCollectionView<T> view;
       try {
-        view = CreatePCollectionViewTranslation.getView(
-            (AppliedPTransform<PCollection<T>, PCollection<T>, PTransform<PCollection<T>, PCollection<T>>>) appliedTransform
-        );
+        view =
+            CreatePCollectionViewTranslation.getView(
+                (AppliedPTransform<
+                        PCollection<T>, PCollection<T>, PTransform<PCollection<T>, PCollection<T>>>)
+                    appliedTransform);
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
@@ -311,7 +330,11 @@ class JetTransformTranslators {
       implements JetTransformTranslator<PTransform<PCollectionList<T>, PCollection<T>>> {
 
     @Override
-    public Vertex translate(Pipeline pipeline, AppliedPTransform<?, ?, ?> appliedTransform, Node node, JetTranslationContext context) {
+    public Vertex translate(
+        Pipeline pipeline,
+        AppliedPTransform<?, ?, ?> appliedTransform,
+        Node node,
+        JetTranslationContext context) {
       Collection<PValue> mainInputs = Utils.getMainInputs(pipeline, node);
       Map<String, Coder> inputCoders =
           Utils.getCoders(
@@ -341,7 +364,11 @@ class JetTransformTranslators {
   private static class WindowTranslator<T>
       implements JetTransformTranslator<PTransform<PCollection<T>, PCollection<T>>> {
     @Override
-    public Vertex translate(Pipeline pipeline, AppliedPTransform<?, ?, ?> appliedTransform, Node node, JetTranslationContext context) {
+    public Vertex translate(
+        Pipeline pipeline,
+        AppliedPTransform<?, ?, ?> appliedTransform,
+        Node node,
+        JetTranslationContext context) {
       WindowingStrategy<T, BoundedWindow> windowingStrategy =
           (WindowingStrategy<T, BoundedWindow>)
               ((PCollection) Utils.getOutput(appliedTransform).getValue()).getWindowingStrategy();
@@ -372,7 +399,11 @@ class JetTransformTranslators {
   private static class ImpulseTranslator
       implements JetTransformTranslator<PTransform<PBegin, PCollection<byte[]>>> {
     @Override
-    public Vertex translate(Pipeline pipeline, AppliedPTransform<?, ?, ?> appliedTransform, Node node, JetTranslationContext context) {
+    public Vertex translate(
+        Pipeline pipeline,
+        AppliedPTransform<?, ?, ?> appliedTransform,
+        Node node,
+        JetTranslationContext context) {
       String transformName = appliedTransform.getFullName();
       DAGBuilder dagBuilder = context.getDagBuilder();
       String vertexId = dagBuilder.newVertexId(transformName);
@@ -391,7 +422,11 @@ class JetTransformTranslators {
   private static class TestStreamTranslator<T>
       implements JetTransformTranslator<PTransform<PBegin, PCollection<T>>> {
     @Override
-    public Vertex translate(Pipeline pipeline, AppliedPTransform<?, ?, ?> appliedTransform, Node node, JetTranslationContext context) {
+    public Vertex translate(
+        Pipeline pipeline,
+        AppliedPTransform<?, ?, ?> appliedTransform,
+        Node node,
+        JetTranslationContext context) {
       String transformName = appliedTransform.getFullName();
       DAGBuilder dagBuilder = context.getDagBuilder();
       String vertexId = dagBuilder.newVertexId(transformName);
@@ -402,10 +437,12 @@ class JetTransformTranslators {
       // the collection.
       Map.Entry<TupleTag<?>, PValue> output = Utils.getOutput(appliedTransform);
       Coder outputCoder = Utils.getCoder((PCollection) output.getValue());
-      TestStream.TestStreamCoder<T> payloadCoder = TestStream.TestStreamCoder.of(testStream.getValueCoder());
+      TestStream.TestStreamCoder<T> payloadCoder =
+          TestStream.TestStreamCoder.of(testStream.getValueCoder());
       byte[] encodedPayload = getEncodedPayload(testStream, payloadCoder);
       Vertex vertex =
-          dagBuilder.addVertex(vertexId, TestStreamP.supplier(encodedPayload, payloadCoder, outputCoder));
+          dagBuilder.addVertex(
+              vertexId, TestStreamP.supplier(encodedPayload, payloadCoder, outputCoder));
 
       String outputEdgeId = Utils.getTupleTagId(output.getValue());
       dagBuilder.registerCollectionOfEdge(outputEdgeId, output.getKey().getId());
@@ -413,7 +450,8 @@ class JetTransformTranslators {
       return vertex;
     }
 
-    private static <T> byte[] getEncodedPayload(TestStream<T> testStream, TestStream.TestStreamCoder<T> coder) {
+    private static <T> byte[] getEncodedPayload(
+        TestStream<T> testStream, TestStream.TestStreamCoder<T> coder) {
       try {
         return CoderUtils.encodeToByteArray(coder, testStream);
       } catch (CoderException e) {
diff --git a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java
index 08c5ec2..519e612 100644
--- a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java
+++ b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java
@@ -131,7 +131,8 @@ public class JetMetricResults extends MetricResults
           .toList();
     }
 
-    private MetricResult<DistributionResult> distributionUpdateToResult(Map.Entry<MetricKey, DistributionData> entry) {
+    private MetricResult<DistributionResult> distributionUpdateToResult(
+        Map.Entry<MetricKey, DistributionData> entry) {
       MetricKey key = entry.getKey();
       DistributionResult distributionResult = entry.getValue().extractResult();
       return MetricResult.create(key, distributionResult, distributionResult);
diff --git a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/BoundedSourceP.java b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/BoundedSourceP.java
index 37f58e8..5e52a4f 100644
--- a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/BoundedSourceP.java
+++ b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/BoundedSourceP.java
@@ -90,7 +90,8 @@ public class BoundedSourceP<T> extends AbstractProcessor implements Traverser {
       }
       return outputCoder == null
           ? res
-          : Utils.encodeWindowedValue(res, outputCoder); // todo: this is not nice, have done this only as a quick fix for
+          : Utils.encodeWindowedValue(
+              res, outputCoder); // todo: this is not nice, have done this only as a quick fix for
       // BoundedSourcePTest
     } catch (IOException e) {
       throw rethrow(e);
diff --git a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/TestStreamP.java b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/TestStreamP.java
index aff3ac9..cf491d5 100644
--- a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/TestStreamP.java
+++ b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/TestStreamP.java
@@ -47,34 +47,42 @@ public class TestStreamP extends AbstractProcessor {
   @SuppressWarnings("unchecked")
   private TestStreamP(byte[] payload, TestStream.TestStreamCoder payloadCoder, Coder outputCoder) {
     List events = decodePayload(payload, payloadCoder).getEvents();
-    traverser = Traversers.traverseStream(
-        events.stream()
-            .flatMap(
-                event -> {
-                  if (event instanceof TestStream.WatermarkEvent) {
-                    Instant watermark = ((TestStream.WatermarkEvent) event).getWatermark();
-                    if (BoundedWindow.TIMESTAMP_MAX_VALUE.equals(watermark)) {
-                      // this is an element added by advanceWatermarkToInfinity(), we ignore it,
-                      // it's always at the end
-                      return null;
-                    }
-                    return Stream.of(new Watermark(watermark.getMillis()));
-                  } else if (event instanceof TestStream.ElementEvent) {
-                    return StreamSupport.stream(((TestStream.ElementEvent<?>) event).getElements().spliterator(), false)
-                        .map(tv -> WindowedValue.timestampedValueInGlobalWindow(tv.getValue(), tv.getTimestamp()))
-                        .map(wV -> Utils.encodeWindowedValue(wV, outputCoder));
-                  } else {
-                    throw new UnsupportedOperationException("Event type not supported in TestStream: " + event.getClass() + ", event: " + event);
-                  }
-                }
-            )
-    );
+    traverser =
+        Traversers.traverseStream(
+            events.stream()
+                .flatMap(
+                    event -> {
+                      if (event instanceof TestStream.WatermarkEvent) {
+                        Instant watermark = ((TestStream.WatermarkEvent) event).getWatermark();
+                        if (BoundedWindow.TIMESTAMP_MAX_VALUE.equals(watermark)) {
+                          // this is an element added by advanceWatermarkToInfinity(), we ignore it,
+                          // it's always at the end
+                          return null;
+                        }
+                        return Stream.of(new Watermark(watermark.getMillis()));
+                      } else if (event instanceof TestStream.ElementEvent) {
+                        return StreamSupport.stream(
+                                ((TestStream.ElementEvent<?>) event).getElements().spliterator(),
+                                false)
+                            .map(
+                                tv ->
+                                    WindowedValue.timestampedValueInGlobalWindow(
+                                        tv.getValue(), tv.getTimestamp()))
+                            .map(wV -> Utils.encodeWindowedValue(wV, outputCoder));
+                      } else {
+                        throw new UnsupportedOperationException(
+                            "Event type not supported in TestStream: "
+                                + event.getClass()
+                                + ", event: "
+                                + event);
+                      }
+                    }));
   }
 
-  public static <T> ProcessorMetaSupplier supplier(byte[] payload, TestStream.TestStreamCoder payloadCoder, Coder outputCoder) {
+  public static <T> ProcessorMetaSupplier supplier(
+      byte[] payload, TestStream.TestStreamCoder payloadCoder, Coder outputCoder) {
     return ProcessorMetaSupplier.forceTotalParallelismOne(
-        ProcessorSupplier.of(() -> new TestStreamP(payload, payloadCoder, outputCoder))
-    );
+        ProcessorSupplier.of(() -> new TestStreamP(payload, payloadCoder, outputCoder)));
   }
 
   private static TestStream decodePayload(byte[] payload, TestStream.TestStreamCoder coder) {
diff --git a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/WindowGroupP.java b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/WindowGroupP.java
index 891be0e..29c4977 100644
--- a/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/WindowGroupP.java
+++ b/runners/jet-experimental/src/main/java/org/apache/beam/runners/jet/processors/WindowGroupP.java
@@ -109,7 +109,8 @@ public class WindowGroupP<K, V> extends AbstractProcessor {
                         windowedValue.getTimestamp(),
                         windowedValue.getWindows(),
                         windowedValue.getPane());
-                KeyManager keyManager = keyManagers.computeIfAbsent(key, k -> new KeyManager(k, latestWatermark));
+                KeyManager keyManager =
+                    keyManagers.computeIfAbsent(key, k -> new KeyManager(k, latestWatermark));
                 keyManager.processElement(updatedWindowedValue);
               }
               return appendableTraverser;