You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xi...@apache.org on 2023/02/13 17:49:10 UTC
[beam] branch master updated: Support gauge metrics in portable mode (#25396)
This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0a0bff5a049 Support gauge metrics in portable mode (#25396)
0a0bff5a049 is described below
commit 0a0bff5a049a17864e66bc34f36e40836238a653
Author: Katie Liu <ka...@linkedin.com>
AuthorDate: Mon Feb 13 09:49:01 2023 -0800
Support gauge metrics in portable mode (#25396)
---
.../runners/core/metrics/MetricsContainerImpl.java | 47 +++++++++++++++++++---
.../core/metrics/MonitoringInfoConstants.java | 2 +
.../core/metrics/MetricsContainerImplTest.java | 35 ++++++++++++++++
3 files changed, 78 insertions(+), 6 deletions(-)
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
index 9c0c2a46c27..c23c2bbfa08 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
@@ -25,10 +25,12 @@ import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decod
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
@@ -281,7 +283,7 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer {
* @return The MonitoringInfo generated from the distribution metricUpdate.
*/
private @Nullable MonitoringInfo distributionUpdateToMonitoringInfo(
- MetricUpdate<org.apache.beam.runners.core.metrics.DistributionData> metricUpdate) {
+ MetricUpdate<DistributionData> metricUpdate) {
SimpleMonitoringInfoBuilder builder = distributionToMonitoringMetadata(metricUpdate.getKey());
if (builder == null) {
return null;
@@ -290,11 +292,33 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer {
return builder.build();
}
+ /** @return The MonitoringInfo metadata from the gauge metric. */
+ private @Nullable SimpleMonitoringInfoBuilder gaugeToMonitoringMetadata(MetricKey metricKey) {
+ return metricToMonitoringMetadata(
+ metricKey,
+ MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE,
+ MonitoringInfoConstants.Urns.USER_LATEST_INT64);
+ }
+
+ /**
+ * @param metricUpdate
+ * @return The MonitoringInfo generated from the distribution metricUpdate.
+ */
+ private @Nullable MonitoringInfo gaugeUpdateToMonitoringInfo(
+ MetricUpdate<GaugeData> metricUpdate) {
+ SimpleMonitoringInfoBuilder builder = gaugeToMonitoringMetadata(metricUpdate.getKey());
+ if (builder == null) {
+ return null;
+ }
+ builder.setInt64LatestValue(metricUpdate.getUpdate());
+ return builder.build();
+ }
+
/** Return the cumulative values for any metrics in this container as MonitoringInfos. */
@Override
public Iterable<MonitoringInfo> getMonitoringInfos() {
// Extract user metrics and store as MonitoringInfos.
- ArrayList<MonitoringInfo> monitoringInfos = new ArrayList<MonitoringInfo>();
+ List<MonitoringInfo> monitoringInfos = new ArrayList<>();
MetricUpdates metricUpdates = this.getUpdates();
for (MetricUpdate<Long> metricUpdate : metricUpdates.counterUpdates()) {
@@ -304,13 +328,19 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer {
}
}
- for (MetricUpdate<org.apache.beam.runners.core.metrics.DistributionData> metricUpdate :
- metricUpdates.distributionUpdates()) {
+ for (MetricUpdate<DistributionData> metricUpdate : metricUpdates.distributionUpdates()) {
MonitoringInfo mi = distributionUpdateToMonitoringInfo(metricUpdate);
if (mi != null) {
monitoringInfos.add(mi);
}
}
+
+ for (MetricUpdate<GaugeData> metricUpdate : metricUpdates.gaugeUpdates()) {
+ MonitoringInfo mi = gaugeUpdateToMonitoringInfo(metricUpdate);
+ if (mi != null) {
+ monitoringInfos.add(mi);
+ }
+ }
return monitoringInfos;
}
@@ -324,14 +354,19 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer {
builder.put(shortId, encodeInt64Counter(metricUpdate.getUpdate()));
}
}
- for (MetricUpdate<org.apache.beam.runners.core.metrics.DistributionData> metricUpdate :
- metricUpdates.distributionUpdates()) {
+ for (MetricUpdate<DistributionData> metricUpdate : metricUpdates.distributionUpdates()) {
String shortId =
getShortId(metricUpdate.getKey(), this::distributionToMonitoringMetadata, shortIds);
if (shortId != null) {
builder.put(shortId, encodeInt64Distribution(metricUpdate.getUpdate()));
}
}
+ for (MetricUpdate<GaugeData> metricUpdate : metricUpdates.gaugeUpdates()) {
+ String shortId = getShortId(metricUpdate.getKey(), this::gaugeToMonitoringMetadata, shortIds);
+ if (shortId != null) {
+ builder.put(shortId, encodeInt64Gauge(metricUpdate.getUpdate()));
+ }
+ }
return builder.build();
}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
index 2808ae58cf1..968c58ab331 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
@@ -43,6 +43,8 @@ public final class MonitoringInfoConstants {
public static final String FINISH_BUNDLE_MSECS =
"beam:metric:pardo_execution_time:finish_bundle_msecs:v1";
public static final String TOTAL_MSECS = extractUrn(MonitoringInfoSpecs.Enum.TOTAL_MSECS);
+ public static final String USER_LATEST_INT64 =
+ extractUrn(MonitoringInfoSpecs.Enum.USER_LATEST_INT64);
public static final String USER_SUM_INT64 = extractUrn(MonitoringInfoSpecs.Enum.USER_SUM_INT64);
public static final String USER_SUM_DOUBLE =
extractUrn(MonitoringInfoSpecs.Enum.USER_SUM_DOUBLE);
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
index 2642a9191ff..146b7df10f0 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
@@ -31,6 +31,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
@@ -234,6 +235,40 @@ public class MetricsContainerImplTest {
assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(), builder2.build()));
}
+ @Test
+ public void testMonitoringInfosArePopulatedForUserGauges() {
+ MetricsContainerImpl testObject = new MetricsContainerImpl("step1");
+ GaugeCell gaugeCell1 = testObject.getGauge(MetricName.named("ns", "name1"));
+ GaugeCell gaugeCell2 = testObject.getGauge(MetricName.named("ns", "name2"));
+ GaugeData gaugeData1 = GaugeData.create(3L);
+ GaugeData gaugeData2 = GaugeData.create(4L);
+ gaugeCell1.update(gaugeData1);
+ gaugeCell2.update(gaugeData2);
+
+ SimpleMonitoringInfoBuilder builder1 = new SimpleMonitoringInfoBuilder();
+ builder1
+ .setUrn(MonitoringInfoConstants.Urns.USER_LATEST_INT64)
+ .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns")
+ .setLabel(MonitoringInfoConstants.Labels.NAME, "name1")
+ .setInt64LatestValue(gaugeData1)
+ .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");
+
+ SimpleMonitoringInfoBuilder builder2 = new SimpleMonitoringInfoBuilder();
+ builder2
+ .setUrn(MonitoringInfoConstants.Urns.USER_LATEST_INT64)
+ .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns")
+ .setLabel(MonitoringInfoConstants.Labels.NAME, "name2")
+ .setInt64LatestValue(gaugeData2)
+ .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");
+
+ List<MonitoringInfo> actualMonitoringInfos = new ArrayList<>();
+ for (MonitoringInfo mi : testObject.getMonitoringInfos()) {
+ actualMonitoringInfos.add(mi);
+ }
+
+ assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(), builder2.build()));
+ }
+
@Test
public void testMonitoringInfosArePopulatedForSystemDistributions() {
MetricsContainerImpl testObject = new MetricsContainerImpl("step1");