You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2024/02/02 18:11:54 UTC

(pinot) branch master updated: Release all segments of a table in releaseAndRemoveAllSegments method (#12297)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 87c089f8eb Release all segments of a table in releaseAndRemoveAllSegments method (#12297)
87c089f8eb is described below

commit 87c089f8eb4ea019231ac5a5600aaf12f591a20d
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Fri Feb 2 10:11:45 2024 -0800

    Release all segments of a table in releaseAndRemoveAllSegments method (#12297)
---
 .../pinot/core/data/manager/BaseTableDataManager.java | 19 +++++++++++++++++--
 .../starter/helix/HelixInstanceDataManager.java       | 19 +++++++++++++++++--
 2 files changed, 34 insertions(+), 4 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 119bede805..191ea04a0d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -223,8 +224,22 @@ public abstract class BaseTableDataManager implements TableDataManager {
       segmentDataManagers = new ArrayList<>(_segmentDataManagerMap.values());
       _segmentDataManagerMap.clear();
     }
-    for (SegmentDataManager segmentDataManager : segmentDataManagers) {
-      releaseSegment(segmentDataManager);
+    if (!segmentDataManagers.isEmpty()) {
+      int numThreads = Math.min(Runtime.getRuntime().availableProcessors(), segmentDataManagers.size());
+      ExecutorService stopExecutorService = Executors.newFixedThreadPool(numThreads);
+      for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+        stopExecutorService.submit(() -> releaseSegment(segmentDataManager));
+      }
+      stopExecutorService.shutdown();
+      try {
+        // Wait at most 10 minutes before exiting this method.
+        if (!stopExecutorService.awaitTermination(10, TimeUnit.MINUTES)) {
+          stopExecutorService.shutdownNow();
+        }
+      } catch (InterruptedException e) {
+        stopExecutorService.shutdownNow();
+        Thread.currentThread().interrupt();
+      }
     }
   }
 
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 8e147cc74a..f097c4103c 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -33,6 +33,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.function.Supplier;
@@ -203,8 +204,22 @@ public class HelixInstanceDataManager implements InstanceDataManager {
     if (_segmentPreloadExecutor != null) {
       _segmentPreloadExecutor.shutdownNow();
     }
-    for (TableDataManager tableDataManager : _tableDataManagerMap.values()) {
-      tableDataManager.shutDown();
+    if (!_tableDataManagerMap.isEmpty()) {
+      int numThreads = Math.min(Runtime.getRuntime().availableProcessors(), _tableDataManagerMap.size());
+      ExecutorService stopExecutorService = Executors.newFixedThreadPool(numThreads);
+      for (TableDataManager tableDataManager : _tableDataManagerMap.values()) {
+        stopExecutorService.submit(tableDataManager::shutDown);
+      }
+      stopExecutorService.shutdown();
+      try {
+        // Wait at most 10 minutes before exiting this method.
+        if (!stopExecutorService.awaitTermination(10, TimeUnit.MINUTES)) {
+          stopExecutorService.shutdownNow();
+        }
+      } catch (InterruptedException e) {
+        stopExecutorService.shutdownNow();
+        Thread.currentThread().interrupt();
+      }
     }
     SegmentBuildTimeLeaseExtender.shutdownExecutor();
     LOGGER.info("Helix instance data manager shut down");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org