You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yh...@apache.org on 2023/10/18 16:02:21 UTC
[beam] branch master updated: Add MetricsContainer::getPerWorker{Counter|Histogram} (#28903) (#28923)
This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 130ad8b3ec6 Add MetricsContainer::getPerWorker{Counter|Histogram} (#28903) (#28923)
130ad8b3ec6 is described below
commit 130ad8b3ec6f1bab3d7c4e6bd5bb6111730a24f2
Author: JayajP <ja...@google.com>
AuthorDate: Wed Oct 18 09:02:11 2023 -0700
Add MetricsContainer::getPerWorker{Counter|Histogram} (#28903) (#28923)
* Add MetricsContainer::getPerWorker{Counter|Histogram}
* Add comments to NoOp{Counter|Histogram}.
---
.../dataflow/worker/DataflowMetricsContainer.java | 13 ++++++
.../dataflow/worker/StreamingDataflowWorker.java | 5 +++
.../worker/StreamingStepMetricsContainer.java | 35 +++++++++++++++
.../worker/StreamingStepMetricsContainerTest.java | 22 ++++++++++
.../apache/beam/sdk/metrics/MetricsContainer.java | 16 +++++++
.../org/apache/beam/sdk/metrics/NoOpCounter.java | 51 ++++++++++++++++++++++
.../org/apache/beam/sdk/metrics/NoOpHistogram.java | 42 ++++++++++++++++++
7 files changed, 184 insertions(+)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java
index 81517129c8e..c3e4fb1388b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java
@@ -22,9 +22,11 @@ import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.util.HistogramData;
/**
* An implementation of {@link MetricsContainer} that reads the current execution state (tracked in
@@ -56,6 +58,11 @@ public class DataflowMetricsContainer implements MetricsContainer {
return getCurrentContainer().getCounter(metricName);
}
+ @Override
+ public Counter getPerWorkerCounter(MetricName metricName) {
+ return getCurrentContainer().getPerWorkerCounter(metricName);
+ }
+
@Override
public Distribution getDistribution(MetricName metricName) {
return getCurrentContainer().getDistribution(metricName);
@@ -65,4 +72,10 @@ public class DataflowMetricsContainer implements MetricsContainer {
public Gauge getGauge(MetricName metricName) {
return getCurrentContainer().getGauge(metricName);
}
+
+ @Override
+ public Histogram getPerWorkerHistogram(
+ MetricName metricName, HistogramData.BucketType bucketType) {
+ return getCurrentContainer().getPerWorkerHistogram(metricName, bucketType);
+ }
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 32b272c6728..4c1693d6138 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -465,6 +465,11 @@ public class StreamingDataflowWorker {
// metrics.
MetricsEnvironment.setProcessWideContainer(new MetricsLogger(null));
+ // When enabled, the Pipeline will record Per-Worker metrics that will be piped to WMW.
+ StreamingStepMetricsContainer.setEnablePerWorkerMetrics(
+ options.isEnableStreamingEngine()
+ && DataflowRunner.hasExperiment(options, "enable_per_worker_metrics"));
+
JvmInitializers.runBeforeProcessing(options);
worker.startStatusPages();
worker.start();
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
index 8c5b9c2f2b6..875a2d649ec 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
@@ -24,13 +24,17 @@ import java.util.Map.Entry;
import javax.annotation.Nonnull;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.GaugeCell;
+import org.apache.beam.runners.core.metrics.HistogramCell;
import org.apache.beam.runners.core.metrics.MetricsMap;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.util.HistogramData;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Function;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
@@ -47,14 +51,22 @@ public class StreamingStepMetricsContainer implements MetricsContainer {
private final String stepName;
+ private static Boolean enablePerWorkerMetrics;
+
private MetricsMap<MetricName, DeltaCounterCell> counters =
new MetricsMap<>(DeltaCounterCell::new);
+ private MetricsMap<MetricName, DeltaCounterCell> perWorkerCounters =
+ new MetricsMap<>(DeltaCounterCell::new);
+
private MetricsMap<MetricName, GaugeCell> gauges = new MetricsMap<>(GaugeCell::new);
private MetricsMap<MetricName, DeltaDistributionCell> distributions =
new MetricsMap<>(DeltaDistributionCell::new);
+ private MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell> perWorkerHistograms =
+ new MetricsMap<>(HistogramCell::new);
+
private StreamingStepMetricsContainer(String stepName) {
this.stepName = stepName;
}
@@ -73,6 +85,15 @@ public class StreamingStepMetricsContainer implements MetricsContainer {
return counters.get(metricName);
}
+ @Override
+ public Counter getPerWorkerCounter(MetricName metricName) {
+ if (enablePerWorkerMetrics) {
+ return perWorkerCounters.get(metricName);
+ } else {
+ return MetricsContainer.super.getPerWorkerCounter(metricName);
+ }
+ }
+
@Override
public Distribution getDistribution(MetricName metricName) {
return distributions.get(metricName);
@@ -83,6 +104,16 @@ public class StreamingStepMetricsContainer implements MetricsContainer {
return gauges.get(metricName);
}
+ @Override
+ public Histogram getPerWorkerHistogram(
+ MetricName metricName, HistogramData.BucketType bucketType) {
+ if (enablePerWorkerMetrics) {
+ return perWorkerHistograms.get(KV.of(metricName, bucketType));
+ } else {
+ return MetricsContainer.super.getPerWorkerHistogram(metricName, bucketType);
+ }
+ }
+
public Iterable<CounterUpdate> extractUpdates() {
return counterUpdates().append(distributionUpdates());
}
@@ -142,4 +173,8 @@ public class StreamingStepMetricsContainer implements MetricsContainer {
.getContainers()
.transformAndConcat(StreamingStepMetricsContainer::extractUpdates);
}
+
+ public static void setEnablePerWorkerMetrics(Boolean enablePerWorkerMetrics) {
+ StreamingStepMetricsContainer.enablePerWorkerMetrics = enablePerWorkerMetrics;
+ }
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java
index 1a4c43905d2..9e6d45a2351 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.dataflow.worker;
import static org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor.longToSplitInt;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;
@@ -33,6 +34,9 @@ import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.O
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.NoOpCounter;
+import org.apache.beam.sdk.metrics.NoOpHistogram;
+import org.apache.beam.sdk.util.HistogramData;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -178,4 +182,22 @@ public class StreamingStepMetricsContainerTest {
.setMin(longToSplitInt(3))
.setSum(longToSplitInt(3)))));
}
+
+ @Test
+ public void testPerWorkerMetrics() {
+ StreamingStepMetricsContainer.setEnablePerWorkerMetrics(false);
+ MetricsContainer metricsContainer = registry.getContainer("test_step");
+ assertThat(
+ metricsContainer.getPerWorkerCounter(name1), sameInstance(NoOpCounter.getInstance()));
+ HistogramData.BucketType testBucket = HistogramData.LinearBuckets.of(1, 1, 1);
+ assertThat(
+ metricsContainer.getPerWorkerHistogram(name1, testBucket),
+ sameInstance(NoOpHistogram.getInstance()));
+
+ StreamingStepMetricsContainer.setEnablePerWorkerMetrics(true);
+ assertThat(metricsContainer.getPerWorkerCounter(name1), not(instanceOf(NoOpCounter.class)));
+ assertThat(
+ metricsContainer.getPerWorkerHistogram(name1, testBucket),
+ not(instanceOf(NoOpHistogram.class)));
+ }
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
index e93f8677b81..f48b9195c37 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
@@ -33,6 +33,14 @@ public interface MetricsContainer extends Serializable {
*/
Counter getCounter(MetricName metricName);
+ /**
+ * Return the {@link Counter} that should be used for implementing the given per-worker {@code metricName)
+ * in this container.
+ */
+ default Counter getPerWorkerCounter(MetricName metricName) {
+ return NoOpCounter.getInstance();
+ }
+
/**
* Return the {@link Distribution} that should be used for implementing the given {@code
* metricName} in this container.
@@ -52,6 +60,14 @@ public interface MetricsContainer extends Serializable {
default Histogram getHistogram(MetricName metricName, HistogramData.BucketType bucketType) {
throw new RuntimeException("Histogram metric is not supported yet.");
}
+ /**
+ * Return the {@link Histogram} that should be used for implementing the given per-worker {@code
+ * metricName} in this container.
+ */
+ default Histogram getPerWorkerHistogram(
+ MetricName metricName, HistogramData.BucketType bucketType) {
+ return NoOpHistogram.getInstance();
+ }
/** Return the cumulative values for any metrics in this container as MonitoringInfos. */
default Iterable<MetricsApi.MonitoringInfo> getMonitoringInfos() {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpCounter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpCounter.java
new file mode 100644
index 00000000000..ab4fa685f9c
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpCounter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+/**
+ * A no-op implementation of Counter. This class exists to provide a default if an implementation of
+ * MetricsContainer does not override a Counter getter.
+ */
+public class NoOpCounter implements Counter {
+
+ private static final NoOpCounter singleton = new NoOpCounter();
+ private static final MetricName name = MetricName.named(NoOpCounter.class, "singleton");
+
+ private NoOpCounter() {}
+
+ @Override
+ public void inc() {}
+
+ @Override
+ public void inc(long n) {}
+
+ @Override
+ public void dec() {}
+
+ @Override
+ public void dec(long n) {}
+
+ @Override
+ public MetricName getName() {
+ return name;
+ }
+
+ public static NoOpCounter getInstance() {
+ return singleton;
+ }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpHistogram.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpHistogram.java
new file mode 100644
index 00000000000..a088223ffe2
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpHistogram.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+/**
+ * A no-op implementation of Histogram. This class exists to provide a default if an implementation
+ * of MetricsContainer does not override a Histogram getter.
+ */
+public class NoOpHistogram implements Histogram {
+
+ private static final NoOpHistogram singleton = new NoOpHistogram();
+ private static final MetricName name = MetricName.named(NoOpHistogram.class, "singleton");
+
+ private NoOpHistogram() {}
+
+ @Override
+ public void update(double value) {}
+
+ @Override
+ public MetricName getName() {
+ return name;
+ }
+
+ public static NoOpHistogram getInstance() {
+ return singleton;
+ }
+}