You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/05/06 22:00:52 UTC

[incubator-pinot] 01/01: Remove realtime metrics if it's destroyed

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch cleanup-realtime-segment-metric
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit d136011b7dec98ae789c68e923d6839934ad2d87
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Thu May 6 15:00:09 2021 -0700

    Remove realtime metrics if it's destroyed
---
 .../pinot/common/metrics/AbstractMetrics.java      | 59 +++++++++++++++++-----
 .../realtime/LLRealtimeSegmentDataManager.java     | 12 +++++
 2 files changed, 58 insertions(+), 13 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
index 44a53c0..1ed8e85 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
@@ -151,11 +151,12 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
    */
   private void addValueToTimer(String fullTimerName, final long duration, final TimeUnit timeUnit) {
     final PinotMetricName metricName = PinotMetricUtils.makePinotMetricName(_clazz, fullTimerName);
-    PinotTimer timer = PinotMetricUtils.makePinotTimer(_metricsRegistry, metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
+    PinotTimer timer =
+        PinotMetricUtils.makePinotTimer(_metricsRegistry, metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
     if (timer != null) {
       timer.update(duration, timeUnit);
-    }  
-   }
+    }
+  }
 
   /**
    * Logs a value to a meter.
@@ -462,16 +463,48 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
    * @param valueCallback The callback function used to retrieve the value of the gauge
    */
   public void addCallbackGauge(final String metricName, final Callable<Long> valueCallback) {
-    PinotMetricUtils.makeGauge(_metricsRegistry, PinotMetricUtils.makePinotMetricName(_clazz, _metricPrefix + metricName),
-        PinotMetricUtils.makePinotGauge(avoid -> {
-          try {
-            return valueCallback.call();
-          } catch (Exception e) {
-            LOGGER.error("Caught exception", e);
-            Utils.rethrowException(e);
-            throw new AssertionError("Should not reach this");
-          }
-        }));
+    PinotMetricUtils
+        .makeGauge(_metricsRegistry, PinotMetricUtils.makePinotMetricName(_clazz, _metricPrefix + metricName),
+            PinotMetricUtils.makePinotGauge(avoid -> {
+              try {
+                return valueCallback.call();
+              } catch (Exception e) {
+                LOGGER.error("Caught exception", e);
+                Utils.rethrowException(e);
+                throw new AssertionError("Should not reach this");
+              }
+            }));
+  }
+
+  /**
+   * Removes a table gauge given the table name and the gauge.
+   * @param tableName table name
+   * @param gauge the gauge to be removed
+   */
+  public void removeTableGauge(final String tableName, final G gauge) {
+    final String fullGaugeName;
+    String gaugeName = gauge.getGaugeName();
+    fullGaugeName = gaugeName + "." + getTableName(tableName);
+    removeGauge(fullGaugeName);
+  }
+
+  /**
+   * Remove gauge from Pinot metrics.
+   * @param gaugeName gauge name
+   */
+  private void removeGauge(final String gaugeName) {
+    if (_gaugeValues.remove(gaugeName) != null) {
+      removeCallbackGauge(gaugeName);
+    }
+  }
+
+  /**
+   * Remove callback gauge.
+   * @param metricName metric name
+   */
+  private void removeCallbackGauge(String metricName) {
+    PinotMetricUtils
+        .removeMetric(_metricsRegistry, PinotMetricUtils.makePinotMetricName(_clazz, _metricPrefix + metricName));
   }
 
   protected abstract QP[] getQueryPhases();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index b182f45..81fa9cb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -923,6 +923,17 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     }
   }
 
+  /**
+   * Cleans up the metrics that reflects the state of the realtime segment.
+   * This step is essential as the instance may not be the target location for some of the partitions.
+   * E.g. if the number of partitions increases, or a host swap is needed, the target location for some partitions may change,
+   * and the current host remains to run. In this case, the current server would still keep the state of the old partitions,
+   * which no longer resides in this host any more, thus causes false positive information to the metric system.
+   */
+  private void cleanupMetrics() {
+    _serverMetrics.removeTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING);
+  }
+
   protected void hold() {
     try {
       Thread.sleep(SegmentCompletionProtocol.MAX_HOLD_TIME_MS);
@@ -1083,6 +1094,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     }
     _realtimeSegment.destroy();
     closeStreamConsumers();
+    cleanupMetrics();
   }
 
   protected void start() {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org