You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/16 04:15:53 UTC
(pinot) branch master updated: Replace timer with scheduled executor service in IngestionDelayTracker (#11849)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 71e9c2cd25 Replace timer with scheduled executor service in IngestionDelayTracker (#11849)
71e9c2cd25 is described below
commit 71e9c2cd251f14ead5e46ff81e1f20557eba9536
Author: Shreyaa Sharma <66...@users.noreply.github.com>
AuthorDate: Thu Nov 16 09:45:47 2023 +0530
Replace timer with scheduled executor service in IngestionDelayTracker (#11849)
---
.../manager/realtime/IngestionDelayTracker.java | 63 +++++++++++++---------
1 file changed, 38 insertions(+), 25 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
index f0ae4c24ab..423f1f21cb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -25,9 +25,12 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -50,8 +53,8 @@ import org.slf4j.LoggerFactory;
* 7-When a segment goes from CONSUMING to ONLINE, we start a timeout for the corresponding partition.
* If no consumption is noticed after the timeout, we then read ideal state to confirm the server still hosts the
* partition. If not, we stop tracking the respective partition.
- * 8-A timer thread is started by this object to track timeouts of partitions and drive the reading of their ideal
- * state.
+ * 8-A scheduled executor thread is started by this object to track timeouts of partitions and drive the reading
+ * of their ideal state.
*
* The following diagram illustrates the object interactions with main external APIs
*
@@ -85,12 +88,12 @@ public class IngestionDelayTracker {
private final long _ingestionTimeMs;
private final long _firstStreamIngestionTimeMs;
}
- // Sleep interval for timer thread that triggers read of ideal state
- private static final int TIMER_THREAD_TICK_INTERVAL_MS = 300000; // 5 minutes +/- precision in timeouts
+ // Sleep interval for scheduled executor service thread that triggers read of ideal state
+ private static final int SCHEDULED_EXECUTOR_THREAD_TICK_INTERVAL_MS = 300000; // 5 minutes +/- precision in timeouts
// Once a partition is marked for verification, we wait 10 minutes to pull its ideal state.
private static final int PARTITION_TIMEOUT_MS = 600000; // 10 minutes timeouts
- // Delay Timer thread for this amount of time after starting timer
- private static final int INITIAL_TIMER_THREAD_DELAY_MS = 100;
+ // Delay scheduled executor service for this amount of time after starting service
+ private static final int INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS = 100;
private static final Logger _logger = LoggerFactory.getLogger(IngestionDelayTracker.class.getSimpleName());
// HashMap used to store ingestion time measures for all partitions active for the current table.
@@ -100,9 +103,10 @@ public class IngestionDelayTracker {
// ideal state. This is done with the goal of minimizing reading ideal state for efficiency reasons.
private final Map<Integer, Long> _partitionsMarkedForVerification = new ConcurrentHashMap<>();
- final int _timerThreadTickIntervalMs;
- // Timer task to check partitions that are inactive against ideal state.
- private final Timer _timer;
+ final int _scheduledExecutorThreadTickIntervalMs;
+ // TODO: Make thread pool a server/cluster level config
+ // ScheduledExecutorService to check partitions that are inactive against ideal state.
+ private final ScheduledExecutorService _scheduledExecutor = Executors.newScheduledThreadPool(2);
private final ServerMetrics _serverMetrics;
private final String _tableNameWithType;
@@ -114,7 +118,7 @@ public class IngestionDelayTracker {
private Clock _clock;
public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithType,
- RealtimeTableDataManager realtimeTableDataManager, int timerThreadTickIntervalMs,
+ RealtimeTableDataManager realtimeTableDataManager, int scheduledExecutorThreadTickIntervalMs,
Supplier<Boolean> isServerReadyToServeQueries)
throws RuntimeException {
_serverMetrics = serverMetrics;
@@ -124,23 +128,32 @@ public class IngestionDelayTracker {
_clock = Clock.systemUTC();
_isServerReadyToServeQueries = isServerReadyToServeQueries;
// Handle negative timer values
- if (timerThreadTickIntervalMs <= 0) {
+ if (scheduledExecutorThreadTickIntervalMs <= 0) {
throw new RuntimeException(String.format("Illegal timer timeout argument, expected > 0, got=%d for table=%s",
- timerThreadTickIntervalMs, _tableNameWithType));
+ scheduledExecutorThreadTickIntervalMs, _tableNameWithType));
}
- _timerThreadTickIntervalMs = timerThreadTickIntervalMs;
- _timer = new Timer("IngestionDelayTimerThread-" + TableNameBuilder.extractRawTableName(tableNameWithType));
- _timer.schedule(new TimerTask() {
- @Override
- public void run() {
- timeoutInactivePartitions();
- }
- }, INITIAL_TIMER_THREAD_DELAY_MS, _timerThreadTickIntervalMs);
+ _scheduledExecutorThreadTickIntervalMs = scheduledExecutorThreadTickIntervalMs;
+
+ // ThreadFactory to set the thread's name
+ ThreadFactory threadFactory = new ThreadFactory() {
+ private final ThreadFactory _defaultFactory = Executors.defaultThreadFactory();
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = _defaultFactory.newThread(r);
+ thread.setName("IngestionDelayTimerThread-" + TableNameBuilder.extractRawTableName(tableNameWithType));
+ return thread;
+ }
+ };
+ ((ScheduledThreadPoolExecutor) _scheduledExecutor).setThreadFactory(threadFactory);
+
+ _scheduledExecutor.scheduleWithFixedDelay(this::timeoutInactivePartitions,
+ INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS, _scheduledExecutorThreadTickIntervalMs, TimeUnit.MILLISECONDS);
}
public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithType,
RealtimeTableDataManager tableDataManager, Supplier<Boolean> isServerReadyToServeQueries) {
- this(serverMetrics, tableNameWithType, tableDataManager, TIMER_THREAD_TICK_INTERVAL_MS,
+ this(serverMetrics, tableNameWithType, tableDataManager, SCHEDULED_EXECUTOR_THREAD_TICK_INTERVAL_MS,
isServerReadyToServeQueries);
}
@@ -255,7 +268,7 @@ public class IngestionDelayTracker {
* This method is used for timing out inactive partitions, so we don't display their metrics on current server.
* When the inactive time exceeds some threshold, we read from ideal state to confirm we still host the partition,
* if not we remove the partition from being tracked locally.
- * This call is to be invoked by a timer thread that will periodically wake up and invoke this function.
+ * This call is to be invoked by a scheduled executor thread that will periodically wake up and invoke this function.
*/
public void timeoutInactivePartitions() {
if (!_isServerReadyToServeQueries.get()) {
@@ -337,7 +350,7 @@ public class IngestionDelayTracker {
*/
public void shutdown() {
// Now that segments can't report metric, destroy metric for this table
- _timer.cancel(); // Timer is installed in constructor so must always be cancelled
+ _scheduledExecutor.shutdown(); // ScheduledExecutor is installed in constructor so must always be cancelled
if (!_isServerReadyToServeQueries.get()) {
// Do not update the tracker state during server startup period
return;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org