You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sa...@apache.org on 2023/01/12 02:10:04 UTC

[pinot] branch master updated: Simplify delay tracking in ingestion delay metric code (#10101)

This is an automated email from the ASF dual-hosted git repository.

sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new bfdc74802c Simplify delay tracking in ingestion delay metric code (#10101)
bfdc74802c is described below

commit bfdc74802ca7b2b3744fe33dd026ab09d56a79a8
Author: Juan Gomez <ju...@linkedin.com>
AuthorDate: Wed Jan 11 18:09:57 2023 -0800

    Simplify delay tracking in ingestion delay metric code (#10101)
---
 .../manager/realtime/IngestionDelayTracker.java    | 122 ++++++++++-----------
 .../realtime/LLRealtimeSegmentDataManager.java     |   8 +-
 .../manager/realtime/RealtimeTableDataManager.java |   9 +-
 .../realtime/IngestionDelayTrackerTest.java        |  68 ++++++------
 .../local/utils/tablestate/TableStateUtils.java    |   2 +-
 .../starter/helix/HelixInstanceDataManager.java    |   6 +-
 6 files changed, 102 insertions(+), 113 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
index 3031aee91b..43e72939e3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -22,6 +22,8 @@ package org.apache.pinot.core.data.manager.realtime;
 import com.google.common.annotations.VisibleForTesting;
 import java.time.Clock;
 import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -35,23 +37,20 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * A Class to track realtime ingestion delay for a given table on a given server.
+ * A Class to track realtime ingestion delay for table partitions on a given server.
  * Highlights:
  * 1-An object of this class is hosted by each RealtimeTableDataManager.
  * 2-The object tracks ingestion delays for all partitions hosted by the current server for the given Realtime table.
  * 3-Partition delays are updated by all LLRealtimeSegmentDataManager objects hosted in the corresponding
  *   RealtimeTableDataManager.
- * 4-A Metric is derived from reading the maximum tracked by this class. In addition, individual metrics are associated
- *   with each partition being tracked.
- * 5-Delays reported for partitions that do not have events to consume are reported as zero.
- * 6-We track the time at which each delay sample was collected so that delay can be increased when partition stops
- *   consuming for any reason other than no events being available for consumption.
- * 7-Partitions whose Segments go from CONSUMING to DROPPED state stop being tracked so their delays do not cloud
+ * 4-Individual metrics are associated with each partition being tracked.
+ * 5-Delays for partitions that do not have events to consume are reported as zero.
+ * 6-Partitions whose Segments go from CONSUMING to DROPPED state stop being tracked so their delays do not cloud
  *   delays of active partitions.
- * 8-When a segment goes from CONSUMING to ONLINE, we start a timeout for the corresponding partition.
+ * 7-When a segment goes from CONSUMING to ONLINE, we start a timeout for the corresponding partition.
  *   If no consumption is noticed after the timeout, we then read ideal state to confirm the server still hosts the
  *   partition. If not, we stop tracking the respective partition.
- * 9-A timer thread is started by this object to track timeouts of partitions and drive the reading of their ideal
+ * 8-A timer thread is started by this object to track timeouts of partitions and drive the reading of their ideal
  *  state.
  *
  *  The following diagram illustrates the object interactions with main external APIs
@@ -85,26 +84,12 @@ public class IngestionDelayTracker {
   private static final int INITIAL_TIMER_THREAD_DELAY_MS = 100;
   private static final Logger _logger = LoggerFactory.getLogger(IngestionDelayTracker.class.getSimpleName());
 
-  /*
-   * Class to keep an ingestion delay measure and the time when the sample was taken (i.e. sample time)
-   * We will use the sample time to increase ingestion delay when a partition stops consuming: the time
-   * difference between the sample time and current time will be added to the metric when read.
-   */
-  static private class DelayMeasure {
-    public DelayMeasure(long t, long d) {
-      _delayMs = d;
-      _sampleTime = t;
-    }
-    public final long _delayMs;
-    public final long _sampleTime;
-  }
-
-  // HashMap used to store delay measures for all partitions active for the current table.
-  private final ConcurrentHashMap<Integer, DelayMeasure> _partitionToDelaySampleMap = new ConcurrentHashMap<>();
+  // HashMap used to store ingestion time measures for all partitions active for the current table.
+  private final Map<Integer, Long> _partitionToIngestionTimeMsMap = new ConcurrentHashMap<>();
   // We mark partitions that go from CONSUMING to ONLINE in _partitionsMarkedForVerification: if they do not
-  // go back to CONSUMING in some period of time, we confirm whether they are still hosted in this server by reading
+  // go back to CONSUMING in some period of time, we verify whether they are still hosted in this server by reading
   // ideal state. This is done with the goal of minimizing reading ideal state for efficiency reasons.
-  private final ConcurrentHashMap<Integer, Long> _partitionsMarkedForVerification = new ConcurrentHashMap<>();
+  private final Map<Integer, Long> _partitionsMarkedForVerification = new ConcurrentHashMap<>();
 
   final int _timerThreadTickIntervalMs;
   // Timer task to check partitions that are inactive against ideal state.
@@ -137,11 +122,11 @@ public class IngestionDelayTracker {
     _timerThreadTickIntervalMs = timerThreadTickIntervalMs;
     _timer = new Timer("IngestionDelayTimerThread-" + TableNameBuilder.extractRawTableName(tableNameWithType));
     _timer.schedule(new TimerTask() {
-      @Override
-      public void run() {
-        timeoutInactivePartitions();
-      }
-    }, INITIAL_TIMER_THREAD_DELAY_MS, _timerThreadTickIntervalMs);
+        @Override
+        public void run() {
+          timeoutInactivePartitions();
+        }
+      }, INITIAL_TIMER_THREAD_DELAY_MS, _timerThreadTickIntervalMs);
   }
 
   public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithType,
@@ -151,20 +136,20 @@ public class IngestionDelayTracker {
   }
 
   /*
-   * Helper function to age a delay measure. Aging means adding the time elapsed since the measure was
-   * taken till the measure is being reported.
-  *
-  * @param currentDelay original sample delay to which we will add the age of the measure.
+   * Helper function to get the ingestion delay for a given ingestion time.
+   * Ingestion delay == Current Time - Ingestion Time
+   *
+   * @param ingestionTimeMs original ingestion time in milliseconds.
    */
-  private long getAgedDelay(DelayMeasure currentDelay) {
-    if (currentDelay == null) {
+  private long getIngestionDelayMs(Long ingestionTimeMs) {
+    if (ingestionTimeMs == null) {
       return 0; // return 0 when not initialized
     }
-    // Add age of measure to the reported value
-    long measureAgeInMs = _clock.millis() - currentDelay._sampleTime;
+    // Compute aged delay for current partition
+    long agedIngestionDelayMs = _clock.millis() - ingestionTimeMs;
     // Correct to zero for any time shifts due to NTP or time reset.
-    measureAgeInMs = Math.max(measureAgeInMs, 0);
-    return currentDelay._delayMs + measureAgeInMs;
+    agedIngestionDelayMs = Math.max(agedIngestionDelayMs, 0);
+    return agedIngestionDelayMs;
   }
 
   /*
@@ -174,7 +159,7 @@ public class IngestionDelayTracker {
    * @param partitionGroupId partition ID which we should stop tracking.
    */
   private void removePartitionId(int partitionGroupId) {
-    _partitionToDelaySampleMap.remove(partitionGroupId);
+    _partitionToIngestionTimeMsMap.remove(partitionGroupId);
     // If we are removing a partition we should stop reading its ideal state.
     _partitionsMarkedForVerification.remove(partitionGroupId);
     _serverMetrics.removePartitionGauge(_metricName, partitionGroupId, ServerGauge.REALTIME_INGESTION_DELAY_MS);
@@ -184,9 +169,9 @@ public class IngestionDelayTracker {
    * Helper functions that creates a list of all the partitions that are marked for verification and whose
    * timeouts are expired. This helps us optimize checks of the ideal state.
    */
-  private ArrayList<Integer> getPartitionsToBeVerified() {
-    ArrayList<Integer> partitionsToVerify = new ArrayList<>();
-    for (ConcurrentHashMap.Entry<Integer, Long> entry : _partitionsMarkedForVerification.entrySet()) {
+  private List<Integer> getPartitionsToBeVerified() {
+    List<Integer> partitionsToVerify = new ArrayList<>();
+    for (Map.Entry<Integer, Long> entry : _partitionsMarkedForVerification.entrySet()) {
       long timeMarked = _clock.millis() - entry.getValue();
       if (timeMarked > PARTITION_TIMEOUT_MS) {
         // Partition must be verified
@@ -208,25 +193,24 @@ public class IngestionDelayTracker {
   }
 
   /*
-   * Called by LLRealTimeSegmentDataManagers to post delay updates to this tracker class.
+   * Called by LLRealTimeSegmentDataManagers to post ingestion time updates to this tracker class.
    *
-   * @param delayMs ingestion delay being recorded.
-   * @param sampleTime sample time.
-   * @param partitionGroupId partition ID for which this delay is being recorded.
+   * @param ingestionTimeMs ingestion time being recorded.
+   * @param partitionGroupId partition ID for which this ingestion time is being recorded.
    */
-  public void updateIngestionDelay(long delayMs, long sampleTime, int partitionGroupId) {
+  public void updateIngestionDelay(long ingestionTimeMs, int partitionGroupId) {
     // Store new measure and wipe old one for this partition
     // TODO: see if we can install gauges after the server is ready.
     if (!_isServerReadyToServeQueries.get()) {
       // Do not update the ingestion delay metrics during server startup period
       return;
     }
-    DelayMeasure previousMeasure = _partitionToDelaySampleMap.put(partitionGroupId,
-        new DelayMeasure(sampleTime, delayMs));
+    Long previousMeasure = _partitionToIngestionTimeMsMap.put(partitionGroupId,
+        ingestionTimeMs);
     if (previousMeasure == null) {
       // First time we start tracking a partition we should start tracking it via metric
       _serverMetrics.addCallbackPartitionGaugeIfNeeded(_metricName, partitionGroupId,
-          ServerGauge.REALTIME_INGESTION_DELAY_MS, () -> getPartitionIngestionDelay(partitionGroupId));
+          ServerGauge.REALTIME_INGESTION_DELAY_MS, () -> getPartitionIngestionDelayMs(partitionGroupId));
     }
     // If we are consuming we do not need to track this partition for removal.
     _partitionsMarkedForVerification.remove(partitionGroupId);
@@ -249,18 +233,23 @@ public class IngestionDelayTracker {
    * This call is to be invoked by a timer thread that will periodically wake up and invoke this function.
    */
   public void timeoutInactivePartitions() {
-    Set<Integer> partitionsHostedByThisServer = null;
+    if (!_isServerReadyToServeQueries.get()) {
+      // Do not update the tracker state during server startup period
+      return;
+    }
     // Check if we have any partition to verify, else don't make the call to check ideal state as that
     // involves network traffic and may be inefficient.
-    ArrayList<Integer> partitionsToVerify = getPartitionsToBeVerified();
+    List<Integer> partitionsToVerify = getPartitionsToBeVerified();
     if (partitionsToVerify.size() == 0) {
       // Don't make the call to getHostedPartitionsGroupIds() as it involves checking ideal state.
       return;
     }
+    Set<Integer> partitionsHostedByThisServer;
     try {
       partitionsHostedByThisServer = _realTimeTableDataManager.getHostedPartitionsGroupIds();
     } catch (Exception e) {
-      _logger.error("Failed to get partitions hosted by this server, table={}", _tableNameWithType);
+      _logger.error("Failed to get partitions hosted by this server, table={}, exception={}:{}", _tableNameWithType,
+          e.getClass(), e.getMessage());
       return;
     }
     for (int partitionGroupId : partitionsToVerify) {
@@ -278,6 +267,10 @@ public class IngestionDelayTracker {
    * @param partitionGroupId Partition id that we need confirmed via ideal state as still hosted by this server.
    */
   public void markPartitionForVerification(int partitionGroupId) {
+    if (!_isServerReadyToServeQueries.get()) {
+      // Do not update the tracker state during server startup period
+      return;
+    }
     _partitionsMarkedForVerification.put(partitionGroupId, _clock.millis());
   }
 
@@ -288,9 +281,10 @@ public class IngestionDelayTracker {
    *
    * @return ingestion delay in milliseconds for the given partition ID.
    */
-  public long getPartitionIngestionDelay(int partitionGroupId) {
-    DelayMeasure currentMeasure = _partitionToDelaySampleMap.get(partitionGroupId);
-    return getAgedDelay(currentMeasure);
+  public long getPartitionIngestionDelayMs(int partitionGroupId) {
+    // Not protected as this will only be invoked when metric is installed which happens after server ready
+    Long currentMeasure = _partitionToIngestionTimeMsMap.get(partitionGroupId);
+    return getIngestionDelayMs(currentMeasure);
   }
 
   /*
@@ -299,9 +293,13 @@ public class IngestionDelayTracker {
    */
   public void shutdown() {
     // Now that segments can't report metric, destroy metric for this table
-    _timer.cancel();
+    _timer.cancel(); // Timer is installed in constructor so must always be cancelled
+    if (!_isServerReadyToServeQueries.get()) {
+      // Do not update the tracker state during server startup period
+      return;
+    }
     // Remove partitions so their related metrics get uninstalled.
-    for (ConcurrentHashMap.Entry<Integer, DelayMeasure> entry : _partitionToDelaySampleMap.entrySet()) {
+    for (Map.Entry<Integer, Long> entry : _partitionToIngestionTimeMsMap.entrySet()) {
       removePartitionId(entry.getKey());
     }
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index fe7c0f30b1..45570b59a4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -614,11 +614,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
             indexedMessageCount, streamMessageCount, _currentOffset);
       }
     } else if (!prematureExit) {
+      // Record Pinot ingestion delay as zero since we are up-to-date and no new events
+      _realtimeTableDataManager.updateIngestionDelay(System.currentTimeMillis(), _partitionGroupId);
       if (_segmentLogger.isDebugEnabled()) {
         _segmentLogger.debug("empty batch received - sleeping for {}ms", idlePipeSleepTimeMillis);
       }
-      // Record Pinot ingestion delay as zero since we are up-to-date and no new events
-      _realtimeTableDataManager.updateIngestionDelay(0, System.currentTimeMillis(), _partitionGroupId);
       // If there were no messages to be fetched from stream, wait for a little bit as to avoid hammering the stream
       Uninterruptibles.sleepUninterruptibly(idlePipeSleepTimeMillis, TimeUnit.MILLISECONDS);
     }
@@ -1569,10 +1569,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
    */
   private void updateIngestionDelay(int indexedMessageCount) {
     if ((indexedMessageCount > 0) && (_lastRowMetadata != null)) {
-      long ingestionDelayMs = _lastConsumedTimestampMs - _lastRowMetadata.getRecordIngestionTimeMs();
-      ingestionDelayMs = Math.max(ingestionDelayMs, 0);
       // Record Ingestion delay for this partition
-      _realtimeTableDataManager.updateIngestionDelay(ingestionDelayMs, _lastConsumedTimestampMs,
+      _realtimeTableDataManager.updateIngestionDelay(_lastRowMetadata.getRecordIngestionTimeMs(),
           _partitionGroupId);
     }
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 0527aac163..4ae54b6bbc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -237,12 +237,11 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
   /*
    * Method used by LLRealtimeSegmentManagers to update their partition delays
    *
-   * @param ingestionDelayMs Ingestion delay being reported.
-   * @param currentTimeMs Timestamp of the measure being provided, i.e. when this delay was computed.
+   * @param ingestionTimeMs Ingestion delay being reported.
    * @param partitionGroupId Partition ID for which delay is being updated.
    */
-  public void updateIngestionDelay(long ingestionDelayMs, long currenTimeMs, int partitionGroupId) {
-    _ingestionDelayTracker.updateIngestionDelay(ingestionDelayMs, currenTimeMs, partitionGroupId);
+  public void updateIngestionDelay(long ingestionTimeMs, int partitionGroupId) {
+    _ingestionDelayTracker.updateIngestionDelay(ingestionTimeMs, partitionGroupId);
   }
 
   /*
@@ -272,7 +271,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
 
   /**
    * Returns all partitionGroupIds for the partitions hosted by this server for current table.
-   * @Note: this involves Zookeeper read and should not be used frequently due to efficiency concerns.
+   * @apiNote  this involves Zookeeper read and should not be used frequently due to efficiency concerns.
    */
   public Set<Integer> getHostedPartitionsGroupIds() {
     Set<Integer> partitionsHostedByThisServer = new HashSet<>();
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
index b91d714e11..e2d19fc0ac 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
@@ -39,7 +39,7 @@ public class IngestionDelayTrackerTest {
         new IngestionDelayTracker(serverMetrics, "dummyTable_RT",
             realtimeTableDataManager, () -> true);
     // With no samples, the time reported must be zero
-    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(0), 0);
+    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 0);
     return ingestionDelayTracker;
   }
 
@@ -51,21 +51,22 @@ public class IngestionDelayTrackerTest {
     IngestionDelayTracker ingestionDelayTracker =
         new IngestionDelayTracker(serverMetrics, "dummyTable_RT",
             realtimeTableDataManager, () -> true);
-    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(0), 0);
+    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 0);
     ingestionDelayTracker.shutdown();
     // Test constructor with timer arguments
     ingestionDelayTracker =
         new IngestionDelayTracker(serverMetrics, "dummyTable_RT",
             realtimeTableDataManager, TIMER_THREAD_TICK_INTERVAL_MS, () -> true);
-    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(0), 0);
+    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 0);
     // Test bad timer args to the constructor
     try {
-      ingestionDelayTracker =
-          new IngestionDelayTracker(serverMetrics, "dummyTable_RT",
+      new IngestionDelayTracker(serverMetrics, "dummyTable_RT",
               realtimeTableDataManager, 0, () -> true);
-      Assert.assertTrue(false); // Constructor must assert
+      Assert.fail("Must have asserted due to invalid arguments"); // Constructor must assert
     } catch (Exception e) {
-      Assert.assertTrue(e instanceof RuntimeException);
+      if ((e instanceof NullPointerException) || !(e instanceof RuntimeException)) {
+        Assert.fail(String.format("Unexpected exception: %s:%s", e.getClass(), e.getMessage()));
+      }
     }
   }
 
@@ -84,29 +85,29 @@ public class IngestionDelayTrackerTest {
 
     // Test we follow a single partition up and down
     for (long i = 0; i <= maxTestDelay; i++) {
-      ingestionDelayTracker.updateIngestionDelay(i, 0, partition0);
-      Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0), i + clock.millis());
+      ingestionDelayTracker.updateIngestionDelay(i, partition0);
+      Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), clock.millis() - i);
     }
 
     // Test tracking down a measure for a given partition
     for (long i = maxTestDelay; i >= 0; i--) {
-      ingestionDelayTracker.updateIngestionDelay(i, 0, partition0);
-      Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0), i + clock.millis());
+      ingestionDelayTracker.updateIngestionDelay(i, partition0);
+      Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), clock.millis() - i);
     }
 
     // Make the current partition maximum
-    ingestionDelayTracker.updateIngestionDelay(maxTestDelay, 0, partition0);
+    ingestionDelayTracker.updateIngestionDelay(maxTestDelay, partition0);
 
     // Bring up partition1 delay up and verify values
     for (long i = 0; i <= 2 * maxTestDelay; i++) {
-      ingestionDelayTracker.updateIngestionDelay(i, 0, partition1);
-      Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition1), i + clock.millis());
+      ingestionDelayTracker.updateIngestionDelay(i, partition1);
+      Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1), clock.millis() - i);
     }
 
     // Bring down values of partition1 and verify values
     for (long i = 2 * maxTestDelay; i >= 0; i--) {
-      ingestionDelayTracker.updateIngestionDelay(i, 0, partition1);
-      Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition1), i + clock.millis());
+      ingestionDelayTracker.updateIngestionDelay(i, partition1);
+      Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1), clock.millis() - i);
     }
 
     ingestionDelayTracker.shutdown();
@@ -121,12 +122,8 @@ public class IngestionDelayTrackerTest {
     final long partition0Offset0Ms = 300;
     final long partition0Offset1Ms = 1000;
     final int partition1 = 1;
-    final long partition1Delay0 = 11; // Record something slightly higher than previous max
-    final long partition1Delay1 = 8;  // Record something lower so that partition0 is the current max again
+    final long partition1Delay0 = 11;
     final long partition1Offset0Ms = 150;
-    final long partition1Offset1Ms = 450;
-
-    final long sleepMs = 500;
 
     IngestionDelayTracker ingestionDelayTracker = createTracker();
 
@@ -135,31 +132,28 @@ public class IngestionDelayTrackerTest {
     ZoneId zoneId = ZoneId.systemDefault();
     Clock clock = Clock.fixed(now, zoneId);
     ingestionDelayTracker.setClock(clock);
-    ingestionDelayTracker.updateIngestionDelay(partition0Delay0, clock.millis(), partition0);
-    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0), partition0Delay0);
+    ingestionDelayTracker.updateIngestionDelay(clock.millis() - partition0Delay0, partition0);
+    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), partition0Delay0);
     // Advance clock and test aging
     Clock offsetClock = Clock.offset(clock, Duration.ofMillis(partition0Offset0Ms));
     ingestionDelayTracker.setClock(offsetClock);
-    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0),
+    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
         (partition0Delay0 + partition0Offset0Ms));
 
-    // Add a new value below max and verify we are tracking the new max correctly
-    ingestionDelayTracker.updateIngestionDelay(partition0Delay1, offsetClock.millis(), partition0);
-    Clock partition0LastUpdate = Clock.offset(offsetClock, Duration.ZERO); // Save this as we need to verify aging later
-    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0), partition0Delay1);
+    ingestionDelayTracker.updateIngestionDelay(offsetClock.millis() - partition0Delay1, partition0);
+    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), partition0Delay1);
     // Add some offset to the last sample and make sure we age that measure properly
     offsetClock = Clock.offset(offsetClock, Duration.ofMillis(partition0Offset1Ms));
     ingestionDelayTracker.setClock(offsetClock);
-    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0),
+    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
         (partition0Delay1 + partition0Offset1Ms));
 
-    // Now try setting a new maximum in another partition
-    ingestionDelayTracker.updateIngestionDelay(partition1Delay0, offsetClock.millis(), partition1);
-    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition1), partition1Delay0);
+    ingestionDelayTracker.updateIngestionDelay(offsetClock.millis() - partition1Delay0, partition1);
+    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1), partition1Delay0);
     // Add some offset to the last sample and make sure we age that measure properly
     offsetClock = Clock.offset(offsetClock, Duration.ofMillis(partition1Offset0Ms));
     ingestionDelayTracker.setClock(offsetClock);
-    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition1),
+    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
         (partition1Delay0 + partition1Offset0Ms));
 
     ingestionDelayTracker.shutdown();
@@ -179,16 +173,16 @@ public class IngestionDelayTrackerTest {
 
     // Record a number of partitions with delay equal to partition id
     for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay; partitionGroupId++) {
-      ingestionDelayTracker.updateIngestionDelay(partitionGroupId, clock.millis(), partitionGroupId);
-      Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partitionGroupId), partitionGroupId);
+      ingestionDelayTracker.updateIngestionDelay(clock.millis() - partitionGroupId, partitionGroupId);
+      Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionGroupId), partitionGroupId);
     }
     // Verify that as we remove partitions the next available maximum takes over
     for (int partitionGroupId = maxPartition; partitionGroupId >= 0; partitionGroupId--) {
-      ingestionDelayTracker.stopTrackingPartitionIngestionDelay((int) partitionGroupId);
+      ingestionDelayTracker.stopTrackingPartitionIngestionDelay(partitionGroupId);
     }
     for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay; partitionGroupId++) {
       // Untracked partitions must return 0
-      Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partitionGroupId), 0);
+      Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionGroupId), 0);
     }
   }
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
index 67be761616..c11886f613 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
@@ -42,7 +42,7 @@ public class TableStateUtils {
    * Returns all segments in a given state for a given table.
    *
    * @param helixManager instance of Helix manager
-   * @param tableNameWithType table for which we are obtaining ONLINE segments
+   * @param tableNameWithType table for which we are obtaining segments in a given state
    * @param state state of the segments to be returned
    *
    * @return List of segment names in a given state.
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index a52d627aac..c4eb0d3e6b 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -90,7 +90,7 @@ public class HelixInstanceDataManager implements InstanceDataManager {
   private ServerMetrics _serverMetrics;
   private ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private SegmentUploader _segmentUploader;
-  private Supplier<Boolean> _isReadyToServeQueries = () -> true;
+  private Supplier<Boolean> _isServerReadyToServeQueries = () -> false;
 
   // Fixed size LRU cache for storing last N errors on the instance.
   // Key is TableNameWithType-SegmentName pair.
@@ -98,7 +98,7 @@ public class HelixInstanceDataManager implements InstanceDataManager {
 
   @Override
   public void setSupplierOfIsServerReadyToServeQueries(Supplier<Boolean> isServingQueries) {
-    _isReadyToServeQueries = isServingQueries;
+    _isServerReadyToServeQueries = isServingQueries;
   }
 
   @Override
@@ -195,7 +195,7 @@ public class HelixInstanceDataManager implements InstanceDataManager {
     TableDataManagerConfig tableDataManagerConfig = new TableDataManagerConfig(_instanceDataManagerConfig, tableConfig);
     TableDataManager tableDataManager =
         TableDataManagerProvider.getTableDataManager(tableDataManagerConfig, _instanceId, _propertyStore,
-            _serverMetrics, _helixManager, _errorCache, _isReadyToServeQueries);
+            _serverMetrics, _helixManager, _errorCache, _isServerReadyToServeQueries);
     tableDataManager.start();
     LOGGER.info("Created table data manager for table: {}", tableNameWithType);
     return tableDataManager;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org