You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xi...@apache.org on 2023/02/13 17:49:10 UTC

[beam] branch master updated: Support gauge metrics in portable mode (#25396)

This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a0bff5a049 Support gauge metrics in portable mode (#25396)
0a0bff5a049 is described below

commit 0a0bff5a049a17864e66bc34f36e40836238a653
Author: Katie Liu <ka...@linkedin.com>
AuthorDate: Mon Feb 13 09:49:01 2023 -0800

    Support gauge metrics in portable mode (#25396)
---
 .../runners/core/metrics/MetricsContainerImpl.java | 47 +++++++++++++++++++---
 .../core/metrics/MonitoringInfoConstants.java      |  2 +
 .../core/metrics/MetricsContainerImplTest.java     | 35 ++++++++++++++++
 3 files changed, 78 insertions(+), 6 deletions(-)

diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
index 9c0c2a46c27..c23c2bbfa08 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
@@ -25,10 +25,12 @@ import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decod
 import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
 import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter;
 import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
@@ -281,7 +283,7 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer {
    * @return The MonitoringInfo generated from the distribution metricUpdate.
    */
   private @Nullable MonitoringInfo distributionUpdateToMonitoringInfo(
-      MetricUpdate<org.apache.beam.runners.core.metrics.DistributionData> metricUpdate) {
+      MetricUpdate<DistributionData> metricUpdate) {
     SimpleMonitoringInfoBuilder builder = distributionToMonitoringMetadata(metricUpdate.getKey());
     if (builder == null) {
       return null;
@@ -290,11 +292,33 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer {
     return builder.build();
   }
 
+  /** @return The MonitoringInfo metadata from the gauge metric. */
+  private @Nullable SimpleMonitoringInfoBuilder gaugeToMonitoringMetadata(MetricKey metricKey) {
+    return metricToMonitoringMetadata(
+        metricKey,
+        MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE,
+        MonitoringInfoConstants.Urns.USER_LATEST_INT64);
+  }
+
+  /**
+   * @param metricUpdate
+   * @return The MonitoringInfo generated from the distribution metricUpdate.
+   */
+  private @Nullable MonitoringInfo gaugeUpdateToMonitoringInfo(
+      MetricUpdate<GaugeData> metricUpdate) {
+    SimpleMonitoringInfoBuilder builder = gaugeToMonitoringMetadata(metricUpdate.getKey());
+    if (builder == null) {
+      return null;
+    }
+    builder.setInt64LatestValue(metricUpdate.getUpdate());
+    return builder.build();
+  }
+
   /** Return the cumulative values for any metrics in this container as MonitoringInfos. */
   @Override
   public Iterable<MonitoringInfo> getMonitoringInfos() {
     // Extract user metrics and store as MonitoringInfos.
-    ArrayList<MonitoringInfo> monitoringInfos = new ArrayList<MonitoringInfo>();
+    List<MonitoringInfo> monitoringInfos = new ArrayList<>();
     MetricUpdates metricUpdates = this.getUpdates();
 
     for (MetricUpdate<Long> metricUpdate : metricUpdates.counterUpdates()) {
@@ -304,13 +328,19 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer {
       }
     }
 
-    for (MetricUpdate<org.apache.beam.runners.core.metrics.DistributionData> metricUpdate :
-        metricUpdates.distributionUpdates()) {
+    for (MetricUpdate<DistributionData> metricUpdate : metricUpdates.distributionUpdates()) {
       MonitoringInfo mi = distributionUpdateToMonitoringInfo(metricUpdate);
       if (mi != null) {
         monitoringInfos.add(mi);
       }
     }
+
+    for (MetricUpdate<GaugeData> metricUpdate : metricUpdates.gaugeUpdates()) {
+      MonitoringInfo mi = gaugeUpdateToMonitoringInfo(metricUpdate);
+      if (mi != null) {
+        monitoringInfos.add(mi);
+      }
+    }
     return monitoringInfos;
   }
 
@@ -324,14 +354,19 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer {
         builder.put(shortId, encodeInt64Counter(metricUpdate.getUpdate()));
       }
     }
-    for (MetricUpdate<org.apache.beam.runners.core.metrics.DistributionData> metricUpdate :
-        metricUpdates.distributionUpdates()) {
+    for (MetricUpdate<DistributionData> metricUpdate : metricUpdates.distributionUpdates()) {
       String shortId =
           getShortId(metricUpdate.getKey(), this::distributionToMonitoringMetadata, shortIds);
       if (shortId != null) {
         builder.put(shortId, encodeInt64Distribution(metricUpdate.getUpdate()));
       }
     }
+    for (MetricUpdate<GaugeData> metricUpdate : metricUpdates.gaugeUpdates()) {
+      String shortId = getShortId(metricUpdate.getKey(), this::gaugeToMonitoringMetadata, shortIds);
+      if (shortId != null) {
+        builder.put(shortId, encodeInt64Gauge(metricUpdate.getUpdate()));
+      }
+    }
     return builder.build();
   }
 
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
index 2808ae58cf1..968c58ab331 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
@@ -43,6 +43,8 @@ public final class MonitoringInfoConstants {
     public static final String FINISH_BUNDLE_MSECS =
         "beam:metric:pardo_execution_time:finish_bundle_msecs:v1";
     public static final String TOTAL_MSECS = extractUrn(MonitoringInfoSpecs.Enum.TOTAL_MSECS);
+    public static final String USER_LATEST_INT64 =
+        extractUrn(MonitoringInfoSpecs.Enum.USER_LATEST_INT64);
     public static final String USER_SUM_INT64 = extractUrn(MonitoringInfoSpecs.Enum.USER_SUM_INT64);
     public static final String USER_SUM_DOUBLE =
         extractUrn(MonitoringInfoSpecs.Enum.USER_SUM_DOUBLE);
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
index 2642a9191ff..146b7df10f0 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
@@ -31,6 +31,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
@@ -234,6 +235,40 @@ public class MetricsContainerImplTest {
     assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(), builder2.build()));
   }
 
+  @Test
+  public void testMonitoringInfosArePopulatedForUserGauges() {
+    MetricsContainerImpl testObject = new MetricsContainerImpl("step1");
+    GaugeCell gaugeCell1 = testObject.getGauge(MetricName.named("ns", "name1"));
+    GaugeCell gaugeCell2 = testObject.getGauge(MetricName.named("ns", "name2"));
+    GaugeData gaugeData1 = GaugeData.create(3L);
+    GaugeData gaugeData2 = GaugeData.create(4L);
+    gaugeCell1.update(gaugeData1);
+    gaugeCell2.update(gaugeData2);
+
+    SimpleMonitoringInfoBuilder builder1 = new SimpleMonitoringInfoBuilder();
+    builder1
+        .setUrn(MonitoringInfoConstants.Urns.USER_LATEST_INT64)
+        .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns")
+        .setLabel(MonitoringInfoConstants.Labels.NAME, "name1")
+        .setInt64LatestValue(gaugeData1)
+        .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");
+
+    SimpleMonitoringInfoBuilder builder2 = new SimpleMonitoringInfoBuilder();
+    builder2
+        .setUrn(MonitoringInfoConstants.Urns.USER_LATEST_INT64)
+        .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns")
+        .setLabel(MonitoringInfoConstants.Labels.NAME, "name2")
+        .setInt64LatestValue(gaugeData2)
+        .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");
+
+    List<MonitoringInfo> actualMonitoringInfos = new ArrayList<>();
+    for (MonitoringInfo mi : testObject.getMonitoringInfos()) {
+      actualMonitoringInfos.add(mi);
+    }
+
+    assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(), builder2.build()));
+  }
+
   @Test
   public void testMonitoringInfosArePopulatedForSystemDistributions() {
     MetricsContainerImpl testObject = new MetricsContainerImpl("step1");