You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/16 04:15:53 UTC

(pinot) branch master updated: Replace timer with scheduled executor service in IngestionDelayTracker (#11849)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 71e9c2cd25 Replace timer with scheduled executor service in IngestionDelayTracker (#11849)
71e9c2cd25 is described below

commit 71e9c2cd251f14ead5e46ff81e1f20557eba9536
Author: Shreyaa Sharma <66...@users.noreply.github.com>
AuthorDate: Thu Nov 16 09:45:47 2023 +0530

    Replace timer with scheduled executor service in IngestionDelayTracker (#11849)
---
 .../manager/realtime/IngestionDelayTracker.java    | 63 +++++++++++++---------
 1 file changed, 38 insertions(+), 25 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
index f0ae4c24ab..423f1f21cb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -25,9 +25,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -50,8 +53,8 @@ import org.slf4j.LoggerFactory;
  * 7-When a segment goes from CONSUMING to ONLINE, we start a timeout for the corresponding partition.
  *   If no consumption is noticed after the timeout, we then read ideal state to confirm the server still hosts the
  *   partition. If not, we stop tracking the respective partition.
- * 8-A timer thread is started by this object to track timeouts of partitions and drive the reading of their ideal
- *  state.
+ * 8-A scheduled executor thread is started by this object to track timeouts of partitions and drive the reading
+ * of their ideal state.
  *
  *  The following diagram illustrates the object interactions with main external APIs
  *
@@ -85,12 +88,12 @@ public class IngestionDelayTracker {
     private final long _ingestionTimeMs;
     private final long _firstStreamIngestionTimeMs;
   }
-  // Sleep interval for timer thread that triggers read of ideal state
-  private static final int TIMER_THREAD_TICK_INTERVAL_MS = 300000; // 5 minutes +/- precision in timeouts
+  // Sleep interval for scheduled executor service thread that triggers read of ideal state
+  private static final int SCHEDULED_EXECUTOR_THREAD_TICK_INTERVAL_MS = 300000; // 5 minutes +/- precision in timeouts
   // Once a partition is marked for verification, we wait 10 minutes to pull its ideal state.
   private static final int PARTITION_TIMEOUT_MS = 600000;          // 10 minutes timeouts
-  // Delay Timer thread for this amount of time after starting timer
-  private static final int INITIAL_TIMER_THREAD_DELAY_MS = 100;
+  // Delay scheduled executor service for this amount of time after starting service
+  private static final int INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS = 100;
   private static final Logger _logger = LoggerFactory.getLogger(IngestionDelayTracker.class.getSimpleName());
 
   // HashMap used to store ingestion time measures for all partitions active for the current table.
@@ -100,9 +103,10 @@ public class IngestionDelayTracker {
   // ideal state. This is done with the goal of minimizing reading ideal state for efficiency reasons.
   private final Map<Integer, Long> _partitionsMarkedForVerification = new ConcurrentHashMap<>();
 
-  final int _timerThreadTickIntervalMs;
-  // Timer task to check partitions that are inactive against ideal state.
-  private final Timer _timer;
+  final int _scheduledExecutorThreadTickIntervalMs;
+  // TODO: Make thread pool a server/cluster level config
+  // ScheduledExecutorService to check partitions that are inactive against ideal state.
+  private final ScheduledExecutorService _scheduledExecutor = Executors.newScheduledThreadPool(2);
 
   private final ServerMetrics _serverMetrics;
   private final String _tableNameWithType;
@@ -114,7 +118,7 @@ public class IngestionDelayTracker {
   private Clock _clock;
 
   public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithType,
-      RealtimeTableDataManager realtimeTableDataManager, int timerThreadTickIntervalMs,
+      RealtimeTableDataManager realtimeTableDataManager, int scheduledExecutorThreadTickIntervalMs,
       Supplier<Boolean> isServerReadyToServeQueries)
       throws RuntimeException {
     _serverMetrics = serverMetrics;
@@ -124,23 +128,32 @@ public class IngestionDelayTracker {
     _clock = Clock.systemUTC();
     _isServerReadyToServeQueries = isServerReadyToServeQueries;
     // Handle negative timer values
-    if (timerThreadTickIntervalMs <= 0) {
+    if (scheduledExecutorThreadTickIntervalMs <= 0) {
       throw new RuntimeException(String.format("Illegal timer timeout argument, expected > 0, got=%d for table=%s",
-          timerThreadTickIntervalMs, _tableNameWithType));
+              scheduledExecutorThreadTickIntervalMs, _tableNameWithType));
     }
-    _timerThreadTickIntervalMs = timerThreadTickIntervalMs;
-    _timer = new Timer("IngestionDelayTimerThread-" + TableNameBuilder.extractRawTableName(tableNameWithType));
-    _timer.schedule(new TimerTask() {
-        @Override
-        public void run() {
-          timeoutInactivePartitions();
-        }
-      }, INITIAL_TIMER_THREAD_DELAY_MS, _timerThreadTickIntervalMs);
+    _scheduledExecutorThreadTickIntervalMs = scheduledExecutorThreadTickIntervalMs;
+
+    // ThreadFactory to set the thread's name
+    ThreadFactory threadFactory = new ThreadFactory() {
+      private final ThreadFactory _defaultFactory = Executors.defaultThreadFactory();
+
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread thread = _defaultFactory.newThread(r);
+        thread.setName("IngestionDelayTimerThread-" + TableNameBuilder.extractRawTableName(tableNameWithType));
+        return thread;
+      }
+    };
+    ((ScheduledThreadPoolExecutor) _scheduledExecutor).setThreadFactory(threadFactory);
+
+    _scheduledExecutor.scheduleWithFixedDelay(this::timeoutInactivePartitions,
+            INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS, _scheduledExecutorThreadTickIntervalMs, TimeUnit.MILLISECONDS);
   }
 
   public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithType,
       RealtimeTableDataManager tableDataManager, Supplier<Boolean> isServerReadyToServeQueries) {
-    this(serverMetrics, tableNameWithType, tableDataManager, TIMER_THREAD_TICK_INTERVAL_MS,
+    this(serverMetrics, tableNameWithType, tableDataManager, SCHEDULED_EXECUTOR_THREAD_TICK_INTERVAL_MS,
         isServerReadyToServeQueries);
   }
 
@@ -255,7 +268,7 @@ public class IngestionDelayTracker {
    * This method is used for timing out inactive partitions, so we don't display their metrics on current server.
    * When the inactive time exceeds some threshold, we read from ideal state to confirm we still host the partition,
    * if not we remove the partition from being tracked locally.
-   * This call is to be invoked by a timer thread that will periodically wake up and invoke this function.
+   * This call is to be invoked by a scheduled executor thread that will periodically wake up and invoke this function.
    */
   public void timeoutInactivePartitions() {
     if (!_isServerReadyToServeQueries.get()) {
@@ -337,7 +350,7 @@ public class IngestionDelayTracker {
    */
   public void shutdown() {
     // Now that segments can't report metric, destroy metric for this table
-    _timer.cancel(); // Timer is installed in constructor so must always be cancelled
+    _scheduledExecutor.shutdown(); // ScheduledExecutor is installed in constructor so must always be cancelled
     if (!_isServerReadyToServeQueries.get()) {
       // Do not update the tracker state during server startup period
       return;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org