You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/21 23:20:59 UTC

[GitHub] [beam] lukecwik opened a new pull request #11487: [BEAM-6597, BEAM-6467] Add support for reporting monitoring infos to the Java SDK harness.

lukecwik opened a new pull request #11487:
URL: https://github.com/apache/beam/pull/11487


   This also exposes a requestProgress API on the RemoteBundle allowing runners to request progress.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on issue #11487: [BEAM-6597, BEAM-6467] Add support for reporting monitoring infos to the Java SDK harness.

Posted by GitBox <gi...@apache.org>.
lostluck commented on issue #11487:
URL: https://github.com/apache/beam/pull/11487#issuecomment-617860776


   Looks like this broke the Go Portable Postcommit tests on Flink and Spark 
   https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/3093/
   https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/3155/


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on issue #11487: [BEAM-6597, BEAM-6467] Add support for reporting monitoring infos to the Java SDK harness.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11487:
URL: https://github.com/apache/beam/pull/11487#issuecomment-617973750


   Initial look doesn't seem 
   
   > Looks like this broke the Go Portable Postcommit tests on Flink and Spark
   > https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/3093/
   > https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/3155/
   
   I took a look and ran the test on my pre-merged branch called metrics and the test passed. It looks like another change may have broken them.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on issue #11487: [BEAM-6597, BEAM-6467] Add support for reporting monitoring infos to the Java SDK harness.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11487:
URL: https://github.com/apache/beam/pull/11487#issuecomment-617543583


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on issue #11487: [BEAM-6597, BEAM-6467] Add support for reporting monitoring infos to the Java SDK harness.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11487:
URL: https://github.com/apache/beam/pull/11487#issuecomment-617496139


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11487: [BEAM-6597, BEAM-6467] Add support for reporting monitoring infos to the Java SDK harness.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11487:
URL: https://github.com/apache/beam/pull/11487#discussion_r412605654



##########
File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
##########
@@ -630,72 +644,100 @@ public void process(ProcessContext ctxt) {
               (Coder<WindowedValue<?>>) remoteOutputCoder.getValue(), outputContents::add));
     }
 
-    Iterable<String> sideInputData = Arrays.asList("A", "B", "C");
+    final String testPTransformId = "create/ParMultiDo(Metrics)";
+    BundleProgressHandler progressHandler =
+        new BundleProgressHandler() {
+          @Override
+          public void onProgress(ProcessBundleProgressResponse response) {
+            MetricsDoFn.ALLOW_COMPLETION.get(metricsDoFn.uuid).countDown();
+            List<Matcher<MonitoringInfo>> matchers = new ArrayList<>();
 
-    StateRequestHandler stateRequestHandler =
-        StateRequestHandlers.forSideInputHandlerFactory(
-            descriptor.getSideInputSpecs(),
-            new SideInputHandlerFactory() {
-              @Override
-              public <V, W extends BoundedWindow>
-                  IterableSideInputHandler<V, W> forIterableSideInput(
-                      String pTransformId,
-                      String sideInputId,
-                      Coder<V> elementCoder,
-                      Coder<W> windowCoder) {
-                throw new UnsupportedOperationException();
-              }
+            // We expect all user counters except for the ones in @FinishBundle
+            // Since non-user metrics are registered at bundle creation time, they will still report
+            // values most of which will be 0.
 
-              @Override
-              public <K, V, W extends BoundedWindow>
-                  MultimapSideInputHandler<K, V, W> forMultimapSideInput(
-                      String pTransformId,
-                      String sideInputId,
-                      KvCoder<K, V> elementCoder,
-                      Coder<W> windowCoder) {
-                return new MultimapSideInputHandler<K, V, W>() {
-                  @Override
-                  public Iterable<V> get(BoundedWindow window) {
-                    return null;
-                  }
+            SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName())
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAME, MetricsDoFn.PROCESS_USER_COUNTER_NAME);
+            builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, testPTransformId);
+            builder.setInt64SumValue(1);
+            matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
 
-                  @Override
-                  public Coder<K> keyCoder() {
-                    return elementCoder.getKeyCoder();
-                  }
+            builder = new SimpleMonitoringInfoBuilder();
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName())
+                .setLabel(MonitoringInfoConstants.Labels.NAME, MetricsDoFn.START_USER_COUNTER_NAME);
+            builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, testPTransformId);
+            builder.setInt64SumValue(10);
+            matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
 
-                  @Override
-                  public Coder<V> valueCoder() {
-                    return elementCoder.getValueCoder();
-                  }
+            builder = new SimpleMonitoringInfoBuilder();
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName())
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAME, MetricsDoFn.FINISH_USER_COUNTER_NAME);
+            builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, testPTransformId);
+            matchers.add(not(MonitoringInfoMatchers.matchSetFields(builder.build())));
 
-                  @Override
-                  public Iterable<V> get(K key, W window) {
-                    return (Iterable) sideInputData;
-                  }
-                };
-              }
-            });
+            // User Distributions.
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName())
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAME,
+                    MetricsDoFn.PROCESS_USER_DISTRIBUTION_NAME);
+            builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, testPTransformId);
+            builder.setInt64DistributionValue(DistributionData.create(1, 1, 1, 1));
+            matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
 
-    String testPTransformId = "create/ParMultiDo(Anonymous)";
-    BundleProgressHandler progressHandler =
-        new BundleProgressHandler() {
-          @Override
-          public void onProgress(ProcessBundleProgressResponse progress) {}
+            builder = new SimpleMonitoringInfoBuilder();
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName())
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAME, MetricsDoFn.START_USER_DISTRIBUTION_NAME);
+            builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, testPTransformId);
+            builder.setInt64DistributionValue(DistributionData.create(10, 1, 10, 10));

Review comment:
       The values should only change if the test changes since they are expected to be emitted once. Only the msec counters are non-deterministic really.
   
   Also, the MonitoringInfo matcher only compares set fields and ignores fields that aren't set.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on issue #11487: [BEAM-6597, BEAM-6467] Add support for reporting monitoring infos to the Java SDK harness.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11487:
URL: https://github.com/apache/beam/pull/11487#issuecomment-617461562


   R: @ajamato 
   CC: @boyuanzz @ananvay 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on issue #11487: [BEAM-6597, BEAM-6467] Add support for reporting monitoring infos to the Java SDK harness.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11487:
URL: https://github.com/apache/beam/pull/11487#issuecomment-617964772


   Taking a look.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ajamato commented on a change in pull request #11487: [BEAM-6597, BEAM-6467] Add support for reporting monitoring infos to the Java SDK harness.

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #11487:
URL: https://github.com/apache/beam/pull/11487#discussion_r412564549



##########
File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
##########
@@ -630,72 +644,100 @@ public void process(ProcessContext ctxt) {
               (Coder<WindowedValue<?>>) remoteOutputCoder.getValue(), outputContents::add));
     }
 
-    Iterable<String> sideInputData = Arrays.asList("A", "B", "C");
+    final String testPTransformId = "create/ParMultiDo(Metrics)";
+    BundleProgressHandler progressHandler =
+        new BundleProgressHandler() {
+          @Override
+          public void onProgress(ProcessBundleProgressResponse response) {
+            MetricsDoFn.ALLOW_COMPLETION.get(metricsDoFn.uuid).countDown();
+            List<Matcher<MonitoringInfo>> matchers = new ArrayList<>();
 
-    StateRequestHandler stateRequestHandler =
-        StateRequestHandlers.forSideInputHandlerFactory(
-            descriptor.getSideInputSpecs(),
-            new SideInputHandlerFactory() {
-              @Override
-              public <V, W extends BoundedWindow>
-                  IterableSideInputHandler<V, W> forIterableSideInput(
-                      String pTransformId,
-                      String sideInputId,
-                      Coder<V> elementCoder,
-                      Coder<W> windowCoder) {
-                throw new UnsupportedOperationException();
-              }
+            // We expect all user counters except for the ones in @FinishBundle
+            // Since non-user metrics are registered at bundle creation time, they will still report
+            // values most of which will be 0.
 
-              @Override
-              public <K, V, W extends BoundedWindow>
-                  MultimapSideInputHandler<K, V, W> forMultimapSideInput(
-                      String pTransformId,
-                      String sideInputId,
-                      KvCoder<K, V> elementCoder,
-                      Coder<W> windowCoder) {
-                return new MultimapSideInputHandler<K, V, W>() {
-                  @Override
-                  public Iterable<V> get(BoundedWindow window) {
-                    return null;
-                  }
+            SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName())
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAME, MetricsDoFn.PROCESS_USER_COUNTER_NAME);
+            builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, testPTransformId);
+            builder.setInt64SumValue(1);
+            matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
 
-                  @Override
-                  public Coder<K> keyCoder() {
-                    return elementCoder.getKeyCoder();
-                  }
+            builder = new SimpleMonitoringInfoBuilder();
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName())
+                .setLabel(MonitoringInfoConstants.Labels.NAME, MetricsDoFn.START_USER_COUNTER_NAME);
+            builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, testPTransformId);
+            builder.setInt64SumValue(10);
+            matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
 
-                  @Override
-                  public Coder<V> valueCoder() {
-                    return elementCoder.getValueCoder();
-                  }
+            builder = new SimpleMonitoringInfoBuilder();
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName())
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAME, MetricsDoFn.FINISH_USER_COUNTER_NAME);
+            builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, testPTransformId);
+            matchers.add(not(MonitoringInfoMatchers.matchSetFields(builder.build())));
 
-                  @Override
-                  public Iterable<V> get(K key, W window) {
-                    return (Iterable) sideInputData;
-                  }
-                };
-              }
-            });
+            // User Distributions.
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName())
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAME,
+                    MetricsDoFn.PROCESS_USER_DISTRIBUTION_NAME);
+            builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, testPTransformId);
+            builder.setInt64DistributionValue(DistributionData.create(1, 1, 1, 1));
+            matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
 
-    String testPTransformId = "create/ParMultiDo(Anonymous)";
-    BundleProgressHandler progressHandler =
-        new BundleProgressHandler() {
-          @Override
-          public void onProgress(ProcessBundleProgressResponse progress) {}
+            builder = new SimpleMonitoringInfoBuilder();
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName())
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAME, MetricsDoFn.START_USER_DISTRIBUTION_NAME);
+            builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, testPTransformId);
+            builder.setInt64DistributionValue(DistributionData.create(10, 1, 10, 10));

Review comment:
       Are these values likely to easily change? You could consider writing new matchers in MonitoringInfoMatchers to match that the values are non 0 or something instead to make it easier to maintain.
   
   Or write matchers that just verify a few fields, unless you really want to verify everything is set on every MonitoringInfo. Might also make it simpler to maintain. Unless you think they are all relevant. Up to your discretion here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org