You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/11/06 00:56:49 UTC

[pinot] branch master updated: Clean up controller-table related metrics in ControllerPeriodicTask (#7557)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 91d2a95  Clean up controller-table related metrics in ControllerPeriodicTask (#7557)
91d2a95 is described below

commit 91d2a958945f0b5d862821a216b0cba065be8461
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Fri Nov 5 17:56:33 2021 -0700

    Clean up controller-table related metrics in ControllerPeriodicTask (#7557)
    
    Co-authored-by: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
---
 .../pinot/common/metrics/ValidationMetrics.java    | 107 +++++++++++++++++----
 .../controller/helix/SegmentStatusChecker.java     |  16 +++
 .../core/periodictask/ControllerPeriodicTask.java  |  14 +++
 .../validation/OfflineSegmentIntervalChecker.java  |  16 +++
 .../RealtimeSegmentValidationManager.java          |  10 ++
 5 files changed, 145 insertions(+), 18 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java
index efe3c82..b47d068 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java
@@ -136,8 +136,16 @@ public class ValidationMetrics {
    * @param missingSegmentCount The number of missing segments
    */
   public void updateMissingSegmentCountGauge(final String resource, final int missingSegmentCount) {
-    final String fullGaugeName = makeGaugeName(resource, "missingSegmentCount");
-    makeGauge(fullGaugeName, makeMetricName(fullGaugeName), _storedValueGaugeFactory, missingSegmentCount);
+    makeGauge(resource, ValidationMetricName.MISSING_SEGMENT_COUNT, _storedValueGaugeFactory, missingSegmentCount);
+  }
+
+  /**
+   * Cleans up the missing segment count gauge.
+   *
+   * @param resource The resource for which the gauge is removed
+   */
+  public void cleanupMissingSegmentCountGauge(final String resource) {
+    removeGauge(resource, ValidationMetricName.MISSING_SEGMENT_COUNT);
   }
 
   /**
@@ -148,12 +156,20 @@ public class ValidationMetrics {
    *                               if there is no such time.
    */
   public void updateOfflineSegmentDelayGauge(final String resource, final long lastOfflineSegmentTime) {
-    final String fullGaugeNameHours = makeGaugeName(resource, "offlineSegmentDelayHours");
-    makeGauge(fullGaugeNameHours, makeMetricName(fullGaugeNameHours), _currentTimeMillisDeltaGaugeHoursFactory,
+    makeGauge(resource, ValidationMetricName.OFFLINE_SEGMENT_DELAY_HOURS, _currentTimeMillisDeltaGaugeHoursFactory,
         lastOfflineSegmentTime);
   }
 
   /**
+   * Cleans up offline segment delay gauge.
+   *
+   * @param resource The resource for which the gauge is removed
+   */
+  public void cleanupOfflineSegmentDelayGauge(final String resource) {
+    removeGauge(resource, ValidationMetricName.OFFLINE_SEGMENT_DELAY_HOURS);
+  }
+
+  /**
    * Updates the last push time gauge.
    *
    * @param resource The resource for which the gauge is updated
@@ -161,20 +177,36 @@ public class ValidationMetrics {
    *                           such time.
    */
   public void updateLastPushTimeGauge(final String resource, final long lastPushTimeMillis) {
-    final String fullGaugeNameHours = makeGaugeName(resource, "lastPushTimeDelayHours");
-    makeGauge(fullGaugeNameHours, makeMetricName(fullGaugeNameHours), _currentTimeMillisDeltaGaugeHoursFactory,
+    makeGauge(resource, ValidationMetricName.LAST_PUSH_TIME_DELAY_HOURS, _currentTimeMillisDeltaGaugeHoursFactory,
         lastPushTimeMillis);
   }
 
   /**
+   * Cleans up the last push time gauge.
+   *
+   * @param resource The resource for which the gauge is removed
+   */
+  public void cleanupLastPushTimeGauge(final String resource) {
+    removeGauge(resource, ValidationMetricName.LAST_PUSH_TIME_DELAY_HOURS);
+  }
+
+  /**
    * Updates the total document count gauge.
    *
    * @param resource The resource for which the gauge is updated
    * @param documentCount Total document count for the given resource name or table name
    */
   public void updateTotalDocumentCountGauge(final String resource, final long documentCount) {
-    final String fullGaugeName = makeGaugeName(resource, "TotalDocumentCount");
-    makeGauge(fullGaugeName, makeMetricName(fullGaugeName), _storedValueGaugeFactory, documentCount);
+    makeGauge(resource, ValidationMetricName.TOTAL_DOCUMENT_COUNT, _storedValueGaugeFactory, documentCount);
+  }
+
+  /**
+   * Cleans up the total document count gauge.
+   *
+   * @param resource The resource for which the gauge is removed
+   */
+  public void cleanupTotalDocumentCountGauge(final String resource) {
+    removeGauge(resource, ValidationMetricName.TOTAL_DOCUMENT_COUNT);
   }
 
   /**
@@ -184,8 +216,7 @@ public class ValidationMetrics {
    * @param partitionCount Number of partitions that do not have any segment in CONSUMING state.
    */
   public void updateNonConsumingPartitionCountMetric(final String resource, final int partitionCount) {
-    final String fullGaugeName = makeGaugeName(resource, "NonConsumingPartitionCount");
-    makeGauge(fullGaugeName, makeMetricName(fullGaugeName), _storedValueGaugeFactory, partitionCount);
+    makeGauge(resource, ValidationMetricName.NON_CONSUMING_PARTITION_COUNT, _storedValueGaugeFactory, partitionCount);
   }
 
   /**
@@ -195,8 +226,16 @@ public class ValidationMetrics {
    * @param segmentCount Total segment count for the given resource name or table name
    */
   public void updateSegmentCountGauge(final String resource, final long segmentCount) {
-    final String fullGaugeName = makeGaugeName(resource, "SegmentCount");
-    makeGauge(fullGaugeName, makeMetricName(fullGaugeName), _storedValueGaugeFactory, segmentCount);
+    makeGauge(resource, ValidationMetricName.SEGMENT_COUNT, _storedValueGaugeFactory, segmentCount);
+  }
+
+  /**
+   * Cleans up the segment count gauge.
+   *
+   * @param resource The resource for which the gauge is removed
+   */
+  public void cleanupSegmentCountGauge(final String resource) {
+    removeGauge(resource, ValidationMetricName.SEGMENT_COUNT);
   }
 
   @VisibleForTesting
@@ -208,17 +247,27 @@ public class ValidationMetrics {
     return PinotMetricUtils.makePinotMetricName(ValidationMetrics.class, gaugeName);
   }
 
-  private void makeGauge(final String gaugeName, final PinotMetricName metricName, final GaugeFactory<?> gaugeFactory,
-      final long value) {
-    if (!_gaugeValues.containsKey(gaugeName)) {
-      _gaugeValues.put(gaugeName, value);
-      PinotMetricUtils.makeGauge(_metricsRegistry, metricName, gaugeFactory.buildGauge(gaugeName));
+  private void makeGauge(final String resource, final ValidationMetricName validationMetricName,
+      final GaugeFactory<?> gaugeFactory, final long value) {
+    final String fullGaugeName = makeGaugeName(resource, validationMetricName.getMetricName());
+    PinotMetricName metricName = makeMetricName(fullGaugeName);
+    if (!_gaugeValues.containsKey(fullGaugeName)) {
+      _gaugeValues.put(fullGaugeName, value);
+      PinotMetricUtils.makeGauge(_metricsRegistry, metricName, gaugeFactory.buildGauge(fullGaugeName));
       _metricNames.add(metricName);
     } else {
-      _gaugeValues.put(gaugeName, value);
+      _gaugeValues.put(fullGaugeName, value);
     }
   }
 
+  private void removeGauge(final String resource, final ValidationMetricName validationMetricName) {
+    final String fullGaugeName = makeGaugeName(resource, validationMetricName.getMetricName());
+    PinotMetricName pinotMetricName = makeMetricName(fullGaugeName);
+    PinotMetricUtils.removeMetric(_metricsRegistry, pinotMetricName);
+    _metricNames.remove(pinotMetricName);
+    _gaugeValues.remove(fullGaugeName);
+  }
+
   /**
    * Unregisters all validation metrics.
    */
@@ -239,4 +288,26 @@ public class ValidationMetrics {
     }
     return value;
   }
+
+  /**
+   * Names of validation metrics.
+   */
+  public enum ValidationMetricName {
+    MISSING_SEGMENT_COUNT("missingSegmentCount"),
+    OFFLINE_SEGMENT_DELAY_HOURS("offlineSegmentDelayHours"),
+    LAST_PUSH_TIME_DELAY_HOURS("lastPushTimeDelayHours"),
+    TOTAL_DOCUMENT_COUNT("TotalDocumentCount"),
+    NON_CONSUMING_PARTITION_COUNT("NonConsumingPartitionCount"),
+    SEGMENT_COUNT("SegmentCount");
+
+    private final String _metricName;
+
+    ValidationMetricName(String metricName) {
+      _metricName = metricName;
+    }
+
+    public String getMetricName() {
+      return _metricName;
+    }
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index d7d74f9..93350fe 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -240,6 +240,22 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
     }
   }
 
+  @Override
+  protected void nonLeaderCleanup(List<String> tableNamesWithType) {
+    for (String tableNameWithType : tableNamesWithType) {
+      _controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS);
+      _controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS);
+      _controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE);
+
+      _controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_SIZE);
+      _controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_BYTE_SIZE);
+      _controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT);
+
+      _controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE);
+      _controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE);
+    }
+  }
+
   private void setStatusToDefault() {
     List<String> allTableNames = _pinotHelixResourceManager.getAllTables();
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 9d7a676..439f8be 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -65,11 +65,14 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
 
       // Process the tables that are managed by this controller
       List<String> tablesToProcess = new ArrayList<>();
+      List<String> nonLeaderForTables = new ArrayList<>();
       if (propTableNameWithType == null) {
         // Table name is not available, so task should run on all tables for which this controller is the lead.
         for (String tableNameWithType : _pinotHelixResourceManager.getAllTables()) {
           if (_leadControllerManager.isLeaderForTable(tableNameWithType)) {
             tablesToProcess.add(tableNameWithType);
+          } else {
+            nonLeaderForTables.add(tableNameWithType);
           }
         }
       } else {
@@ -82,6 +85,9 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
       if (!tablesToProcess.isEmpty()) {
         processTables(tablesToProcess);
       }
+      if (!nonLeaderForTables.isEmpty()) {
+        nonLeaderCleanup(nonLeaderForTables);
+      }
     } catch (Exception e) {
       LOGGER.error("Caught exception while running task: {}", _taskName, e);
       _controllerMetrics.addMeteredTableValue(_taskName, ControllerMeter.CONTROLLER_PERIODIC_TASK_ERROR, 1L);
@@ -156,4 +162,12 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
    */
   protected void postprocess() {
   }
+
+  /**
+   * Can be overridden to perform cleanups for tables that the current controller isn't the leader.
+   *
+   * @param tableNamesWithType the table names that the current controller isn't the leader for
+   */
+  protected void nonLeaderCleanup(List<String> tableNamesWithType) {
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index 1e11d1e..1b08c01 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -136,6 +136,22 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void>
     _validationMetrics.updateSegmentCountGauge(offlineTableName, numSegments);
   }
 
+  @Override
+  protected void nonLeaderCleanup(List<String> tableNamesWithType) {
+    for (String tableNameWithType : tableNamesWithType) {
+      TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == TableType.OFFLINE) {
+        // TODO: we can further split the existing ValidationMetricName enum to OFFLINE and REALTIME,
+        //  so that we can simply loop through all the enum values and clean up the metrics.
+        _validationMetrics.cleanupMissingSegmentCountGauge(tableNameWithType);
+        _validationMetrics.cleanupOfflineSegmentDelayGauge(tableNameWithType);
+        _validationMetrics.cleanupLastPushTimeGauge(tableNameWithType);
+        _validationMetrics.cleanupTotalDocumentCountGauge(tableNameWithType);
+        _validationMetrics.cleanupSegmentCountGauge(tableNameWithType);
+      }
+    }
+  }
+
   /**
    * Computes the number of missing segments based on the given existing segment intervals and the expected frequency
    * of the intervals.
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 53291b3..237924a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -124,6 +124,16 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
     }
   }
 
+  @Override
+  protected void nonLeaderCleanup(List<String> tableNamesWithType) {
+    for (String tableNameWithType : tableNamesWithType) {
+      TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == TableType.REALTIME) {
+        _validationMetrics.cleanupTotalDocumentCountGauge(tableNameWithType);
+      }
+    }
+  }
+
   @VisibleForTesting
   static long computeRealtimeTotalDocumentInSegments(List<SegmentZKMetadata> segmentsZKMetadata,
       boolean countHLCSegments) {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org