You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sa...@apache.org on 2023/01/12 02:10:04 UTC
[pinot] branch master updated: Simplify delay tracking in ingestion delay metric code (#10101)
This is an automated email from the ASF dual-hosted git repository.
sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new bfdc74802c Simplify delay tracking in ingestion delay metric code (#10101)
bfdc74802c is described below
commit bfdc74802ca7b2b3744fe33dd026ab09d56a79a8
Author: Juan Gomez <ju...@linkedin.com>
AuthorDate: Wed Jan 11 18:09:57 2023 -0800
Simplify delay tracking in ingestion delay metric code (#10101)
---
.../manager/realtime/IngestionDelayTracker.java | 122 ++++++++++-----------
.../realtime/LLRealtimeSegmentDataManager.java | 8 +-
.../manager/realtime/RealtimeTableDataManager.java | 9 +-
.../realtime/IngestionDelayTrackerTest.java | 68 ++++++------
.../local/utils/tablestate/TableStateUtils.java | 2 +-
.../starter/helix/HelixInstanceDataManager.java | 6 +-
6 files changed, 102 insertions(+), 113 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
index 3031aee91b..43e72939e3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -22,6 +22,8 @@ package org.apache.pinot.core.data.manager.realtime;
import com.google.common.annotations.VisibleForTesting;
import java.time.Clock;
import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
@@ -35,23 +37,20 @@ import org.slf4j.LoggerFactory;
/**
- * A Class to track realtime ingestion delay for a given table on a given server.
+ * A Class to track realtime ingestion delay for table partitions on a given server.
* Highlights:
* 1-An object of this class is hosted by each RealtimeTableDataManager.
* 2-The object tracks ingestion delays for all partitions hosted by the current server for the given Realtime table.
* 3-Partition delays are updated by all LLRealtimeSegmentDataManager objects hosted in the corresponding
* RealtimeTableDataManager.
- * 4-A Metric is derived from reading the maximum tracked by this class. In addition, individual metrics are associated
- * with each partition being tracked.
- * 5-Delays reported for partitions that do not have events to consume are reported as zero.
- * 6-We track the time at which each delay sample was collected so that delay can be increased when partition stops
- * consuming for any reason other than no events being available for consumption.
- * 7-Partitions whose Segments go from CONSUMING to DROPPED state stop being tracked so their delays do not cloud
+ * 4-Individual metrics are associated with each partition being tracked.
+ * 5-Delays for partitions that do not have events to consume are reported as zero.
+ * 6-Partitions whose Segments go from CONSUMING to DROPPED state stop being tracked so their delays do not cloud
* delays of active partitions.
- * 8-When a segment goes from CONSUMING to ONLINE, we start a timeout for the corresponding partition.
+ * 7-When a segment goes from CONSUMING to ONLINE, we start a timeout for the corresponding partition.
* If no consumption is noticed after the timeout, we then read ideal state to confirm the server still hosts the
* partition. If not, we stop tracking the respective partition.
- * 9-A timer thread is started by this object to track timeouts of partitions and drive the reading of their ideal
+ * 8-A timer thread is started by this object to track timeouts of partitions and drive the reading of their ideal
* state.
*
* The following diagram illustrates the object interactions with main external APIs
@@ -85,26 +84,12 @@ public class IngestionDelayTracker {
private static final int INITIAL_TIMER_THREAD_DELAY_MS = 100;
private static final Logger _logger = LoggerFactory.getLogger(IngestionDelayTracker.class.getSimpleName());
- /*
- * Class to keep an ingestion delay measure and the time when the sample was taken (i.e. sample time)
- * We will use the sample time to increase ingestion delay when a partition stops consuming: the time
- * difference between the sample time and current time will be added to the metric when read.
- */
- static private class DelayMeasure {
- public DelayMeasure(long t, long d) {
- _delayMs = d;
- _sampleTime = t;
- }
- public final long _delayMs;
- public final long _sampleTime;
- }
-
- // HashMap used to store delay measures for all partitions active for the current table.
- private final ConcurrentHashMap<Integer, DelayMeasure> _partitionToDelaySampleMap = new ConcurrentHashMap<>();
+ // HashMap used to store ingestion time measures for all partitions active for the current table.
+ private final Map<Integer, Long> _partitionToIngestionTimeMsMap = new ConcurrentHashMap<>();
// We mark partitions that go from CONSUMING to ONLINE in _partitionsMarkedForVerification: if they do not
- // go back to CONSUMING in some period of time, we confirm whether they are still hosted in this server by reading
+ // go back to CONSUMING in some period of time, we verify whether they are still hosted in this server by reading
// ideal state. This is done with the goal of minimizing reading ideal state for efficiency reasons.
- private final ConcurrentHashMap<Integer, Long> _partitionsMarkedForVerification = new ConcurrentHashMap<>();
+ private final Map<Integer, Long> _partitionsMarkedForVerification = new ConcurrentHashMap<>();
final int _timerThreadTickIntervalMs;
// Timer task to check partitions that are inactive against ideal state.
@@ -137,11 +122,11 @@ public class IngestionDelayTracker {
_timerThreadTickIntervalMs = timerThreadTickIntervalMs;
_timer = new Timer("IngestionDelayTimerThread-" + TableNameBuilder.extractRawTableName(tableNameWithType));
_timer.schedule(new TimerTask() {
- @Override
- public void run() {
- timeoutInactivePartitions();
- }
- }, INITIAL_TIMER_THREAD_DELAY_MS, _timerThreadTickIntervalMs);
+ @Override
+ public void run() {
+ timeoutInactivePartitions();
+ }
+ }, INITIAL_TIMER_THREAD_DELAY_MS, _timerThreadTickIntervalMs);
}
public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithType,
@@ -151,20 +136,20 @@ public class IngestionDelayTracker {
}
/*
- * Helper function to age a delay measure. Aging means adding the time elapsed since the measure was
- * taken till the measure is being reported.
- *
- * @param currentDelay original sample delay to which we will add the age of the measure.
+ * Helper function to get the ingestion delay for a given ingestion time.
+ * Ingestion delay == Current Time - Ingestion Time
+ *
+ * @param ingestionTimeMs original ingestion time in milliseconds.
*/
- private long getAgedDelay(DelayMeasure currentDelay) {
- if (currentDelay == null) {
+ private long getIngestionDelayMs(Long ingestionTimeMs) {
+ if (ingestionTimeMs == null) {
return 0; // return 0 when not initialized
}
- // Add age of measure to the reported value
- long measureAgeInMs = _clock.millis() - currentDelay._sampleTime;
+ // Compute aged delay for current partition
+ long agedIngestionDelayMs = _clock.millis() - ingestionTimeMs;
// Correct to zero for any time shifts due to NTP or time reset.
- measureAgeInMs = Math.max(measureAgeInMs, 0);
- return currentDelay._delayMs + measureAgeInMs;
+ agedIngestionDelayMs = Math.max(agedIngestionDelayMs, 0);
+ return agedIngestionDelayMs;
}
/*
@@ -174,7 +159,7 @@ public class IngestionDelayTracker {
* @param partitionGroupId partition ID which we should stop tracking.
*/
private void removePartitionId(int partitionGroupId) {
- _partitionToDelaySampleMap.remove(partitionGroupId);
+ _partitionToIngestionTimeMsMap.remove(partitionGroupId);
// If we are removing a partition we should stop reading its ideal state.
_partitionsMarkedForVerification.remove(partitionGroupId);
_serverMetrics.removePartitionGauge(_metricName, partitionGroupId, ServerGauge.REALTIME_INGESTION_DELAY_MS);
@@ -184,9 +169,9 @@ public class IngestionDelayTracker {
* Helper functions that creates a list of all the partitions that are marked for verification and whose
* timeouts are expired. This helps us optimize checks of the ideal state.
*/
- private ArrayList<Integer> getPartitionsToBeVerified() {
- ArrayList<Integer> partitionsToVerify = new ArrayList<>();
- for (ConcurrentHashMap.Entry<Integer, Long> entry : _partitionsMarkedForVerification.entrySet()) {
+ private List<Integer> getPartitionsToBeVerified() {
+ List<Integer> partitionsToVerify = new ArrayList<>();
+ for (Map.Entry<Integer, Long> entry : _partitionsMarkedForVerification.entrySet()) {
long timeMarked = _clock.millis() - entry.getValue();
if (timeMarked > PARTITION_TIMEOUT_MS) {
// Partition must be verified
@@ -208,25 +193,24 @@ public class IngestionDelayTracker {
}
/*
- * Called by LLRealTimeSegmentDataManagers to post delay updates to this tracker class.
+ * Called by LLRealTimeSegmentDataManagers to post ingestion time updates to this tracker class.
*
- * @param delayMs ingestion delay being recorded.
- * @param sampleTime sample time.
- * @param partitionGroupId partition ID for which this delay is being recorded.
+ * @param ingestionTimeMs ingestion time being recorded.
+ * @param partitionGroupId partition ID for which this ingestion time is being recorded.
*/
- public void updateIngestionDelay(long delayMs, long sampleTime, int partitionGroupId) {
+ public void updateIngestionDelay(long ingestionTimeMs, int partitionGroupId) {
// Store new measure and wipe old one for this partition
// TODO: see if we can install gauges after the server is ready.
if (!_isServerReadyToServeQueries.get()) {
// Do not update the ingestion delay metrics during server startup period
return;
}
- DelayMeasure previousMeasure = _partitionToDelaySampleMap.put(partitionGroupId,
- new DelayMeasure(sampleTime, delayMs));
+ Long previousMeasure = _partitionToIngestionTimeMsMap.put(partitionGroupId,
+ ingestionTimeMs);
if (previousMeasure == null) {
// First time we start tracking a partition we should start tracking it via metric
_serverMetrics.addCallbackPartitionGaugeIfNeeded(_metricName, partitionGroupId,
- ServerGauge.REALTIME_INGESTION_DELAY_MS, () -> getPartitionIngestionDelay(partitionGroupId));
+ ServerGauge.REALTIME_INGESTION_DELAY_MS, () -> getPartitionIngestionDelayMs(partitionGroupId));
}
// If we are consuming we do not need to track this partition for removal.
_partitionsMarkedForVerification.remove(partitionGroupId);
@@ -249,18 +233,23 @@ public class IngestionDelayTracker {
* This call is to be invoked by a timer thread that will periodically wake up and invoke this function.
*/
public void timeoutInactivePartitions() {
- Set<Integer> partitionsHostedByThisServer = null;
+ if (!_isServerReadyToServeQueries.get()) {
+ // Do not update the tracker state during server startup period
+ return;
+ }
// Check if we have any partition to verify, else don't make the call to check ideal state as that
// involves network traffic and may be inefficient.
- ArrayList<Integer> partitionsToVerify = getPartitionsToBeVerified();
+ List<Integer> partitionsToVerify = getPartitionsToBeVerified();
if (partitionsToVerify.size() == 0) {
// Don't make the call to getHostedPartitionsGroupIds() as it involves checking ideal state.
return;
}
+ Set<Integer> partitionsHostedByThisServer;
try {
partitionsHostedByThisServer = _realTimeTableDataManager.getHostedPartitionsGroupIds();
} catch (Exception e) {
- _logger.error("Failed to get partitions hosted by this server, table={}", _tableNameWithType);
+ _logger.error("Failed to get partitions hosted by this server, table={}, exception={}:{}", _tableNameWithType,
+ e.getClass(), e.getMessage());
return;
}
for (int partitionGroupId : partitionsToVerify) {
@@ -278,6 +267,10 @@ public class IngestionDelayTracker {
* @param partitionGroupId Partition id that we need confirmed via ideal state as still hosted by this server.
*/
public void markPartitionForVerification(int partitionGroupId) {
+ if (!_isServerReadyToServeQueries.get()) {
+ // Do not update the tracker state during server startup period
+ return;
+ }
_partitionsMarkedForVerification.put(partitionGroupId, _clock.millis());
}
@@ -288,9 +281,10 @@ public class IngestionDelayTracker {
*
* @return ingestion delay in milliseconds for the given partition ID.
*/
- public long getPartitionIngestionDelay(int partitionGroupId) {
- DelayMeasure currentMeasure = _partitionToDelaySampleMap.get(partitionGroupId);
- return getAgedDelay(currentMeasure);
+ public long getPartitionIngestionDelayMs(int partitionGroupId) {
+ // Not protected as this will only be invoked when metric is installed which happens after server ready
+ Long currentMeasure = _partitionToIngestionTimeMsMap.get(partitionGroupId);
+ return getIngestionDelayMs(currentMeasure);
}
/*
@@ -299,9 +293,13 @@ public class IngestionDelayTracker {
*/
public void shutdown() {
// Now that segments can't report metric, destroy metric for this table
- _timer.cancel();
+ _timer.cancel(); // Timer is installed in constructor so must always be cancelled
+ if (!_isServerReadyToServeQueries.get()) {
+ // Do not update the tracker state during server startup period
+ return;
+ }
// Remove partitions so their related metrics get uninstalled.
- for (ConcurrentHashMap.Entry<Integer, DelayMeasure> entry : _partitionToDelaySampleMap.entrySet()) {
+ for (Map.Entry<Integer, Long> entry : _partitionToIngestionTimeMsMap.entrySet()) {
removePartitionId(entry.getKey());
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index fe7c0f30b1..45570b59a4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -614,11 +614,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
indexedMessageCount, streamMessageCount, _currentOffset);
}
} else if (!prematureExit) {
+ // Record Pinot ingestion delay as zero since we are up-to-date and no new events
+ _realtimeTableDataManager.updateIngestionDelay(System.currentTimeMillis(), _partitionGroupId);
if (_segmentLogger.isDebugEnabled()) {
_segmentLogger.debug("empty batch received - sleeping for {}ms", idlePipeSleepTimeMillis);
}
- // Record Pinot ingestion delay as zero since we are up-to-date and no new events
- _realtimeTableDataManager.updateIngestionDelay(0, System.currentTimeMillis(), _partitionGroupId);
// If there were no messages to be fetched from stream, wait for a little bit as to avoid hammering the stream
Uninterruptibles.sleepUninterruptibly(idlePipeSleepTimeMillis, TimeUnit.MILLISECONDS);
}
@@ -1569,10 +1569,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
*/
private void updateIngestionDelay(int indexedMessageCount) {
if ((indexedMessageCount > 0) && (_lastRowMetadata != null)) {
- long ingestionDelayMs = _lastConsumedTimestampMs - _lastRowMetadata.getRecordIngestionTimeMs();
- ingestionDelayMs = Math.max(ingestionDelayMs, 0);
// Record Ingestion delay for this partition
- _realtimeTableDataManager.updateIngestionDelay(ingestionDelayMs, _lastConsumedTimestampMs,
+ _realtimeTableDataManager.updateIngestionDelay(_lastRowMetadata.getRecordIngestionTimeMs(),
_partitionGroupId);
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 0527aac163..4ae54b6bbc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -237,12 +237,11 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
/*
* Method used by LLRealtimeSegmentManagers to update their partition delays
*
- * @param ingestionDelayMs Ingestion delay being reported.
- * @param currentTimeMs Timestamp of the measure being provided, i.e. when this delay was computed.
+ * @param ingestionTimeMs Ingestion delay being reported.
* @param partitionGroupId Partition ID for which delay is being updated.
*/
- public void updateIngestionDelay(long ingestionDelayMs, long currenTimeMs, int partitionGroupId) {
- _ingestionDelayTracker.updateIngestionDelay(ingestionDelayMs, currenTimeMs, partitionGroupId);
+ public void updateIngestionDelay(long ingestionTimeMs, int partitionGroupId) {
+ _ingestionDelayTracker.updateIngestionDelay(ingestionTimeMs, partitionGroupId);
}
/*
@@ -272,7 +271,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
/**
* Returns all partitionGroupIds for the partitions hosted by this server for current table.
- * @Note: this involves Zookeeper read and should not be used frequently due to efficiency concerns.
+ * @apiNote this involves Zookeeper read and should not be used frequently due to efficiency concerns.
*/
public Set<Integer> getHostedPartitionsGroupIds() {
Set<Integer> partitionsHostedByThisServer = new HashSet<>();
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
index b91d714e11..e2d19fc0ac 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
@@ -39,7 +39,7 @@ public class IngestionDelayTrackerTest {
new IngestionDelayTracker(serverMetrics, "dummyTable_RT",
realtimeTableDataManager, () -> true);
// With no samples, the time reported must be zero
- Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(0), 0);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 0);
return ingestionDelayTracker;
}
@@ -51,21 +51,22 @@ public class IngestionDelayTrackerTest {
IngestionDelayTracker ingestionDelayTracker =
new IngestionDelayTracker(serverMetrics, "dummyTable_RT",
realtimeTableDataManager, () -> true);
- Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(0), 0);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 0);
ingestionDelayTracker.shutdown();
// Test constructor with timer arguments
ingestionDelayTracker =
new IngestionDelayTracker(serverMetrics, "dummyTable_RT",
realtimeTableDataManager, TIMER_THREAD_TICK_INTERVAL_MS, () -> true);
- Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(0), 0);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 0);
// Test bad timer args to the constructor
try {
- ingestionDelayTracker =
- new IngestionDelayTracker(serverMetrics, "dummyTable_RT",
+ new IngestionDelayTracker(serverMetrics, "dummyTable_RT",
realtimeTableDataManager, 0, () -> true);
- Assert.assertTrue(false); // Constructor must assert
+ Assert.fail("Must have asserted due to invalid arguments"); // Constructor must assert
} catch (Exception e) {
- Assert.assertTrue(e instanceof RuntimeException);
+ if ((e instanceof NullPointerException) || !(e instanceof RuntimeException)) {
+ Assert.fail(String.format("Unexpected exception: %s:%s", e.getClass(), e.getMessage()));
+ }
}
}
@@ -84,29 +85,29 @@ public class IngestionDelayTrackerTest {
// Test we follow a single partition up and down
for (long i = 0; i <= maxTestDelay; i++) {
- ingestionDelayTracker.updateIngestionDelay(i, 0, partition0);
- Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0), i + clock.millis());
+ ingestionDelayTracker.updateIngestionDelay(i, partition0);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), clock.millis() - i);
}
// Test tracking down a measure for a given partition
for (long i = maxTestDelay; i >= 0; i--) {
- ingestionDelayTracker.updateIngestionDelay(i, 0, partition0);
- Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0), i + clock.millis());
+ ingestionDelayTracker.updateIngestionDelay(i, partition0);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), clock.millis() - i);
}
// Make the current partition maximum
- ingestionDelayTracker.updateIngestionDelay(maxTestDelay, 0, partition0);
+ ingestionDelayTracker.updateIngestionDelay(maxTestDelay, partition0);
// Bring up partition1 delay up and verify values
for (long i = 0; i <= 2 * maxTestDelay; i++) {
- ingestionDelayTracker.updateIngestionDelay(i, 0, partition1);
- Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition1), i + clock.millis());
+ ingestionDelayTracker.updateIngestionDelay(i, partition1);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1), clock.millis() - i);
}
// Bring down values of partition1 and verify values
for (long i = 2 * maxTestDelay; i >= 0; i--) {
- ingestionDelayTracker.updateIngestionDelay(i, 0, partition1);
- Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition1), i + clock.millis());
+ ingestionDelayTracker.updateIngestionDelay(i, partition1);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1), clock.millis() - i);
}
ingestionDelayTracker.shutdown();
@@ -121,12 +122,8 @@ public class IngestionDelayTrackerTest {
final long partition0Offset0Ms = 300;
final long partition0Offset1Ms = 1000;
final int partition1 = 1;
- final long partition1Delay0 = 11; // Record something slightly higher than previous max
- final long partition1Delay1 = 8; // Record something lower so that partition0 is the current max again
+ final long partition1Delay0 = 11;
final long partition1Offset0Ms = 150;
- final long partition1Offset1Ms = 450;
-
- final long sleepMs = 500;
IngestionDelayTracker ingestionDelayTracker = createTracker();
@@ -135,31 +132,28 @@ public class IngestionDelayTrackerTest {
ZoneId zoneId = ZoneId.systemDefault();
Clock clock = Clock.fixed(now, zoneId);
ingestionDelayTracker.setClock(clock);
- ingestionDelayTracker.updateIngestionDelay(partition0Delay0, clock.millis(), partition0);
- Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0), partition0Delay0);
+ ingestionDelayTracker.updateIngestionDelay(clock.millis() - partition0Delay0, partition0);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), partition0Delay0);
// Advance clock and test aging
Clock offsetClock = Clock.offset(clock, Duration.ofMillis(partition0Offset0Ms));
ingestionDelayTracker.setClock(offsetClock);
- Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0),
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
(partition0Delay0 + partition0Offset0Ms));
- // Add a new value below max and verify we are tracking the new max correctly
- ingestionDelayTracker.updateIngestionDelay(partition0Delay1, offsetClock.millis(), partition0);
- Clock partition0LastUpdate = Clock.offset(offsetClock, Duration.ZERO); // Save this as we need to verify aging later
- Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0), partition0Delay1);
+ ingestionDelayTracker.updateIngestionDelay(offsetClock.millis() - partition0Delay1, partition0);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), partition0Delay1);
// Add some offset to the last sample and make sure we age that measure properly
offsetClock = Clock.offset(offsetClock, Duration.ofMillis(partition0Offset1Ms));
ingestionDelayTracker.setClock(offsetClock);
- Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0),
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
(partition0Delay1 + partition0Offset1Ms));
- // Now try setting a new maximum in another partition
- ingestionDelayTracker.updateIngestionDelay(partition1Delay0, offsetClock.millis(), partition1);
- Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition1), partition1Delay0);
+ ingestionDelayTracker.updateIngestionDelay(offsetClock.millis() - partition1Delay0, partition1);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1), partition1Delay0);
// Add some offset to the last sample and make sure we age that measure properly
offsetClock = Clock.offset(offsetClock, Duration.ofMillis(partition1Offset0Ms));
ingestionDelayTracker.setClock(offsetClock);
- Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition1),
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
(partition1Delay0 + partition1Offset0Ms));
ingestionDelayTracker.shutdown();
@@ -179,16 +173,16 @@ public class IngestionDelayTrackerTest {
// Record a number of partitions with delay equal to partition id
for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay; partitionGroupId++) {
- ingestionDelayTracker.updateIngestionDelay(partitionGroupId, clock.millis(), partitionGroupId);
- Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partitionGroupId), partitionGroupId);
+ ingestionDelayTracker.updateIngestionDelay(clock.millis() - partitionGroupId, partitionGroupId);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionGroupId), partitionGroupId);
}
// Verify that as we remove partitions the next available maximum takes over
for (int partitionGroupId = maxPartition; partitionGroupId >= 0; partitionGroupId--) {
- ingestionDelayTracker.stopTrackingPartitionIngestionDelay((int) partitionGroupId);
+ ingestionDelayTracker.stopTrackingPartitionIngestionDelay(partitionGroupId);
}
for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay; partitionGroupId++) {
// Untracked partitions must return 0
- Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partitionGroupId), 0);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionGroupId), 0);
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
index 67be761616..c11886f613 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
@@ -42,7 +42,7 @@ public class TableStateUtils {
* Returns all segments in a given state for a given table.
*
* @param helixManager instance of Helix manager
- * @param tableNameWithType table for which we are obtaining ONLINE segments
+ * @param tableNameWithType table for which we are obtaining segments in a given state
* @param state state of the segments to be returned
*
* @return List of segment names in a given state.
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index a52d627aac..c4eb0d3e6b 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -90,7 +90,7 @@ public class HelixInstanceDataManager implements InstanceDataManager {
private ServerMetrics _serverMetrics;
private ZkHelixPropertyStore<ZNRecord> _propertyStore;
private SegmentUploader _segmentUploader;
- private Supplier<Boolean> _isReadyToServeQueries = () -> true;
+ private Supplier<Boolean> _isServerReadyToServeQueries = () -> false;
// Fixed size LRU cache for storing last N errors on the instance.
// Key is TableNameWithType-SegmentName pair.
@@ -98,7 +98,7 @@ public class HelixInstanceDataManager implements InstanceDataManager {
@Override
public void setSupplierOfIsServerReadyToServeQueries(Supplier<Boolean> isServingQueries) {
- _isReadyToServeQueries = isServingQueries;
+ _isServerReadyToServeQueries = isServingQueries;
}
@Override
@@ -195,7 +195,7 @@ public class HelixInstanceDataManager implements InstanceDataManager {
TableDataManagerConfig tableDataManagerConfig = new TableDataManagerConfig(_instanceDataManagerConfig, tableConfig);
TableDataManager tableDataManager =
TableDataManagerProvider.getTableDataManager(tableDataManagerConfig, _instanceId, _propertyStore,
- _serverMetrics, _helixManager, _errorCache, _isReadyToServeQueries);
+ _serverMetrics, _helixManager, _errorCache, _isServerReadyToServeQueries);
tableDataManager.start();
LOGGER.info("Created table data manager for table: {}", tableNameWithType);
return tableDataManager;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org