You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yh...@apache.org on 2023/10/18 16:02:21 UTC

[beam] branch master updated: Add MetricsContainer::getPerWorker{Counter|Histogram} (#28903) (#28923)

This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 130ad8b3ec6 Add MetricsContainer::getPerWorker{Counter|Histogram} (#28903) (#28923)
130ad8b3ec6 is described below

commit 130ad8b3ec6f1bab3d7c4e6bd5bb6111730a24f2
Author: JayajP <ja...@google.com>
AuthorDate: Wed Oct 18 09:02:11 2023 -0700

    Add MetricsContainer::getPerWorker{Counter|Histogram} (#28903) (#28923)
    
    * Add MetricsContainer::getPerWorker{Counter|Histogram}
    
    * Add comments to NoOp{Counter|Histogram}.
---
 .../dataflow/worker/DataflowMetricsContainer.java  | 13 ++++++
 .../dataflow/worker/StreamingDataflowWorker.java   |  5 +++
 .../worker/StreamingStepMetricsContainer.java      | 35 +++++++++++++++
 .../worker/StreamingStepMetricsContainerTest.java  | 22 ++++++++++
 .../apache/beam/sdk/metrics/MetricsContainer.java  | 16 +++++++
 .../org/apache/beam/sdk/metrics/NoOpCounter.java   | 51 ++++++++++++++++++++++
 .../org/apache/beam/sdk/metrics/NoOpHistogram.java | 42 ++++++++++++++++++
 7 files changed, 184 insertions(+)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java
index 81517129c8e..c3e4fb1388b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java
@@ -22,9 +22,11 @@ import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.Histogram;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.util.HistogramData;
 
 /**
  * An implementation of {@link MetricsContainer} that reads the current execution state (tracked in
@@ -56,6 +58,11 @@ public class DataflowMetricsContainer implements MetricsContainer {
     return getCurrentContainer().getCounter(metricName);
   }
 
+  @Override
+  public Counter getPerWorkerCounter(MetricName metricName) {
+    return getCurrentContainer().getPerWorkerCounter(metricName);
+  }
+
   @Override
   public Distribution getDistribution(MetricName metricName) {
     return getCurrentContainer().getDistribution(metricName);
@@ -65,4 +72,10 @@ public class DataflowMetricsContainer implements MetricsContainer {
   public Gauge getGauge(MetricName metricName) {
     return getCurrentContainer().getGauge(metricName);
   }
+
+  @Override
+  public Histogram getPerWorkerHistogram(
+      MetricName metricName, HistogramData.BucketType bucketType) {
+    return getCurrentContainer().getPerWorkerHistogram(metricName, bucketType);
+  }
 }
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 32b272c6728..4c1693d6138 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -465,6 +465,11 @@ public class StreamingDataflowWorker {
     // metrics.
     MetricsEnvironment.setProcessWideContainer(new MetricsLogger(null));
 
+    // When enabled, the Pipeline will record Per-Worker metrics that will be piped to WMW.
+    StreamingStepMetricsContainer.setEnablePerWorkerMetrics(
+        options.isEnableStreamingEngine()
+            && DataflowRunner.hasExperiment(options, "enable_per_worker_metrics"));
+
     JvmInitializers.runBeforeProcessing(options);
     worker.startStatusPages();
     worker.start();
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
index 8c5b9c2f2b6..875a2d649ec 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
@@ -24,13 +24,17 @@ import java.util.Map.Entry;
 import javax.annotation.Nonnull;
 import org.apache.beam.runners.core.metrics.DistributionData;
 import org.apache.beam.runners.core.metrics.GaugeCell;
+import org.apache.beam.runners.core.metrics.HistogramCell;
 import org.apache.beam.runners.core.metrics.MetricsMap;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.Histogram;
 import org.apache.beam.sdk.metrics.MetricKey;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.util.HistogramData;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Function;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
@@ -47,14 +51,22 @@ public class StreamingStepMetricsContainer implements MetricsContainer {
 
   private final String stepName;
 
+  private static Boolean enablePerWorkerMetrics;
+
   private MetricsMap<MetricName, DeltaCounterCell> counters =
       new MetricsMap<>(DeltaCounterCell::new);
 
+  private MetricsMap<MetricName, DeltaCounterCell> perWorkerCounters =
+      new MetricsMap<>(DeltaCounterCell::new);
+
   private MetricsMap<MetricName, GaugeCell> gauges = new MetricsMap<>(GaugeCell::new);
 
   private MetricsMap<MetricName, DeltaDistributionCell> distributions =
       new MetricsMap<>(DeltaDistributionCell::new);
 
+  private MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell> perWorkerHistograms =
+      new MetricsMap<>(HistogramCell::new);
+
   private StreamingStepMetricsContainer(String stepName) {
     this.stepName = stepName;
   }
@@ -73,6 +85,15 @@ public class StreamingStepMetricsContainer implements MetricsContainer {
     return counters.get(metricName);
   }
 
+  @Override
+  public Counter getPerWorkerCounter(MetricName metricName) {
+    if (enablePerWorkerMetrics) {
+      return perWorkerCounters.get(metricName);
+    } else {
+      return MetricsContainer.super.getPerWorkerCounter(metricName);
+    }
+  }
+
   @Override
   public Distribution getDistribution(MetricName metricName) {
     return distributions.get(metricName);
@@ -83,6 +104,16 @@ public class StreamingStepMetricsContainer implements MetricsContainer {
     return gauges.get(metricName);
   }
 
+  @Override
+  public Histogram getPerWorkerHistogram(
+      MetricName metricName, HistogramData.BucketType bucketType) {
+    if (enablePerWorkerMetrics) {
+      return perWorkerHistograms.get(KV.of(metricName, bucketType));
+    } else {
+      return MetricsContainer.super.getPerWorkerHistogram(metricName, bucketType);
+    }
+  }
+
   public Iterable<CounterUpdate> extractUpdates() {
     return counterUpdates().append(distributionUpdates());
   }
@@ -142,4 +173,8 @@ public class StreamingStepMetricsContainer implements MetricsContainer {
         .getContainers()
         .transformAndConcat(StreamingStepMetricsContainer::extractUpdates);
   }
+
+  public static void setEnablePerWorkerMetrics(Boolean enablePerWorkerMetrics) {
+    StreamingStepMetricsContainer.enablePerWorkerMetrics = enablePerWorkerMetrics;
+  }
 }
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java
index 1a4c43905d2..9e6d45a2351 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.dataflow.worker;
 import static org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor.longToSplitInt;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.sameInstance;
 
@@ -33,6 +34,9 @@ import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.O
 import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.NoOpCounter;
+import org.apache.beam.sdk.metrics.NoOpHistogram;
+import org.apache.beam.sdk.util.HistogramData;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -178,4 +182,22 @@ public class StreamingStepMetricsContainerTest {
                         .setMin(longToSplitInt(3))
                         .setSum(longToSplitInt(3)))));
   }
+
+  @Test
+  public void testPerWorkerMetrics() {
+    StreamingStepMetricsContainer.setEnablePerWorkerMetrics(false);
+    MetricsContainer metricsContainer = registry.getContainer("test_step");
+    assertThat(
+        metricsContainer.getPerWorkerCounter(name1), sameInstance(NoOpCounter.getInstance()));
+    HistogramData.BucketType testBucket = HistogramData.LinearBuckets.of(1, 1, 1);
+    assertThat(
+        metricsContainer.getPerWorkerHistogram(name1, testBucket),
+        sameInstance(NoOpHistogram.getInstance()));
+
+    StreamingStepMetricsContainer.setEnablePerWorkerMetrics(true);
+    assertThat(metricsContainer.getPerWorkerCounter(name1), not(instanceOf(NoOpCounter.class)));
+    assertThat(
+        metricsContainer.getPerWorkerHistogram(name1, testBucket),
+        not(instanceOf(NoOpHistogram.class)));
+  }
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
index e93f8677b81..f48b9195c37 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
@@ -33,6 +33,14 @@ public interface MetricsContainer extends Serializable {
    */
   Counter getCounter(MetricName metricName);
 
+  /**
+   * Return the {@link Counter} that should be used for implementing the given per-worker {@code metricName)
+   * in this container.
+   */
+  default Counter getPerWorkerCounter(MetricName metricName) {
+    return NoOpCounter.getInstance();
+  }
+
   /**
    * Return the {@link Distribution} that should be used for implementing the given {@code
    * metricName} in this container.
@@ -52,6 +60,14 @@ public interface MetricsContainer extends Serializable {
   default Histogram getHistogram(MetricName metricName, HistogramData.BucketType bucketType) {
     throw new RuntimeException("Histogram metric is not supported yet.");
   }
+  /**
+   * Return the {@link Histogram} that should be used for implementing the given per-worker {@code
+   * metricName} in this container.
+   */
+  default Histogram getPerWorkerHistogram(
+      MetricName metricName, HistogramData.BucketType bucketType) {
+    return NoOpHistogram.getInstance();
+  }
 
   /** Return the cumulative values for any metrics in this container as MonitoringInfos. */
   default Iterable<MetricsApi.MonitoringInfo> getMonitoringInfos() {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpCounter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpCounter.java
new file mode 100644
index 00000000000..ab4fa685f9c
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpCounter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+/**
+ * A no-op implementation of Counter. This class exists to provide a default if an implementation of
+ * MetricsContainer does not override a Counter getter.
+ */
+public class NoOpCounter implements Counter {
+
+  private static final NoOpCounter singleton = new NoOpCounter();
+  private static final MetricName name = MetricName.named(NoOpCounter.class, "singleton");
+
+  private NoOpCounter() {}
+
+  @Override
+  public void inc() {}
+
+  @Override
+  public void inc(long n) {}
+
+  @Override
+  public void dec() {}
+
+  @Override
+  public void dec(long n) {}
+
+  @Override
+  public MetricName getName() {
+    return name;
+  }
+
+  public static NoOpCounter getInstance() {
+    return singleton;
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpHistogram.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpHistogram.java
new file mode 100644
index 00000000000..a088223ffe2
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpHistogram.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+/**
+ * A no-op implementation of Histogram. This class exists to provide a default if an implementation
+ * of MetricsContainer does not override a Histogram getter.
+ */
+public class NoOpHistogram implements Histogram {
+
+  private static final NoOpHistogram singleton = new NoOpHistogram();
+  private static final MetricName name = MetricName.named(NoOpHistogram.class, "singleton");
+
+  private NoOpHistogram() {}
+
+  @Override
+  public void update(double value) {}
+
+  @Override
+  public MetricName getName() {
+    return name;
+  }
+
+  public static NoOpHistogram getInstance() {
+    return singleton;
+  }
+}