You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/22 01:38:35 UTC

[GitHub] [beam] lukecwik commented on a change in pull request #11487: [BEAM-6597, BEAM-6467] Add support for reporting monitoring infos to the Java SDK harness.

lukecwik commented on a change in pull request #11487:
URL: https://github.com/apache/beam/pull/11487#discussion_r412605654



##########
File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
##########
@@ -630,72 +644,100 @@ public void process(ProcessContext ctxt) {
               (Coder<WindowedValue<?>>) remoteOutputCoder.getValue(), outputContents::add));
     }
 
-    Iterable<String> sideInputData = Arrays.asList("A", "B", "C");
+    final String testPTransformId = "create/ParMultiDo(Metrics)";
+    BundleProgressHandler progressHandler =
+        new BundleProgressHandler() {
+          @Override
+          public void onProgress(ProcessBundleProgressResponse response) {
+            MetricsDoFn.ALLOW_COMPLETION.get(metricsDoFn.uuid).countDown();
+            List<Matcher<MonitoringInfo>> matchers = new ArrayList<>();
 
-    StateRequestHandler stateRequestHandler =
-        StateRequestHandlers.forSideInputHandlerFactory(
-            descriptor.getSideInputSpecs(),
-            new SideInputHandlerFactory() {
-              @Override
-              public <V, W extends BoundedWindow>
-                  IterableSideInputHandler<V, W> forIterableSideInput(
-                      String pTransformId,
-                      String sideInputId,
-                      Coder<V> elementCoder,
-                      Coder<W> windowCoder) {
-                throw new UnsupportedOperationException();
-              }
+            // We expect all user counters except for the ones in @FinishBundle
+            // Since non-user metrics are registered at bundle creation time, they will still report
+            // values most of which will be 0.
 
-              @Override
-              public <K, V, W extends BoundedWindow>
-                  MultimapSideInputHandler<K, V, W> forMultimapSideInput(
-                      String pTransformId,
-                      String sideInputId,
-                      KvCoder<K, V> elementCoder,
-                      Coder<W> windowCoder) {
-                return new MultimapSideInputHandler<K, V, W>() {
-                  @Override
-                  public Iterable<V> get(BoundedWindow window) {
-                    return null;
-                  }
+            SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName())
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAME, MetricsDoFn.PROCESS_USER_COUNTER_NAME);
+            builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, testPTransformId);
+            builder.setInt64SumValue(1);
+            matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
 
-                  @Override
-                  public Coder<K> keyCoder() {
-                    return elementCoder.getKeyCoder();
-                  }
+            builder = new SimpleMonitoringInfoBuilder();
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName())
+                .setLabel(MonitoringInfoConstants.Labels.NAME, MetricsDoFn.START_USER_COUNTER_NAME);
+            builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, testPTransformId);
+            builder.setInt64SumValue(10);
+            matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
 
-                  @Override
-                  public Coder<V> valueCoder() {
-                    return elementCoder.getValueCoder();
-                  }
+            builder = new SimpleMonitoringInfoBuilder();
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName())
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAME, MetricsDoFn.FINISH_USER_COUNTER_NAME);
+            builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, testPTransformId);
+            matchers.add(not(MonitoringInfoMatchers.matchSetFields(builder.build())));
 
-                  @Override
-                  public Iterable<V> get(K key, W window) {
-                    return (Iterable) sideInputData;
-                  }
-                };
-              }
-            });
+            // User Distributions.
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName())
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAME,
+                    MetricsDoFn.PROCESS_USER_DISTRIBUTION_NAME);
+            builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, testPTransformId);
+            builder.setInt64DistributionValue(DistributionData.create(1, 1, 1, 1));
+            matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
 
-    String testPTransformId = "create/ParMultiDo(Anonymous)";
-    BundleProgressHandler progressHandler =
-        new BundleProgressHandler() {
-          @Override
-          public void onProgress(ProcessBundleProgressResponse progress) {}
+            builder = new SimpleMonitoringInfoBuilder();
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName())
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAME, MetricsDoFn.START_USER_DISTRIBUTION_NAME);
+            builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, testPTransformId);
+            builder.setInt64DistributionValue(DistributionData.create(10, 1, 10, 10));

Review comment:
       The values should only change if the test changes since they are expected to be emitted once. Only the msec counters are non-deterministic really.
   
   Also, the MonitoringInfo matcher only compares set fields and ignores fields that aren't set.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org