You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/06/07 20:03:28 UTC

[pinot] branch master updated: refine segment reload executor to avoid creating threads unbounded (#10837)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b0d0d86a9 refine segment reload executor to avoid creating threads unbounded (#10837)
4b0d0d86a9 is described below

commit 4b0d0d86a9f6f29b4da92d320f52fc0880dabdfd
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Wed Jun 7 13:03:18 2023 -0700

    refine segment reload executor to avoid creating threads unbounded (#10837)
---
 .../pinot/common/utils/config/TierConfigUtils.java | 31 +++++++++++-----------
 .../pinot/common/tier/TierConfigUtilsTest.java     | 23 +++++-----------
 .../apache/pinot/controller/ControllerConf.java    |  1 -
 .../core/data/manager/BaseTableDataManager.java    | 14 +++++-----
 .../loader/TierBasedSegmentDirectoryLoader.java    | 14 +++++-----
 .../starter/helix/HelixInstanceDataManager.java    | 14 +++++-----
 6 files changed, 42 insertions(+), 55 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java
index d6c8eccf60..31e32113e7 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.common.utils.config;
 
-import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -63,10 +62,6 @@ public final class TierConfigUtils {
     return tierName == null ? "default" : tierName;
   }
 
-  public static String getDataDirForTier(TableConfig tableConfig, String tierName) {
-    return getDataDirForTier(tableConfig, tierName, Collections.emptyMap());
-  }
-
   /**
    * Consider configured tiers and compute default instance partitions for the segment
    *
@@ -93,6 +88,12 @@ public final class TierConfigUtils {
     return null;
   }
 
+  @Nullable
+  public static String getDataDirForTier(TableConfig tableConfig, String tierName) {
+    return getDataDirForTier(tableConfig, tierName, Collections.emptyMap());
+  }
+
+  @Nullable
   public static String getDataDirForTier(TableConfig tableConfig, String tierName,
       Map<String, Map<String, String>> instanceTierConfigs) {
     String tableNameWithType = tableConfig.getTableName();
@@ -108,17 +109,17 @@ public final class TierConfigUtils {
       }
       if (tierCfg != null) {
         Map<String, String> backendProps = tierCfg.getTierBackendProperties();
-        if (backendProps != null) {
-          dataDir = backendProps.get(CommonConstants.Tier.BACKEND_PROP_DATA_DIR);
-        } else {
+        if (backendProps == null) {
           LOGGER.debug("No backend props for tier: {} in TableConfig of table: {}", tierName, tableNameWithType);
-        }
-        if (StringUtils.isNotEmpty(dataDir)) {
-          LOGGER.debug("Got dataDir: {} for tier: {} in TableConfig of table: {}", dataDir, tierName,
-              tableNameWithType);
-          return dataDir;
         } else {
-          LOGGER.debug("No dataDir for tier: {} in TableConfig of table: {}", tierName, tableNameWithType);
+          dataDir = backendProps.get(CommonConstants.Tier.BACKEND_PROP_DATA_DIR);
+          if (StringUtils.isNotEmpty(dataDir)) {
+            LOGGER.debug("Got dataDir: {} for tier: {} in TableConfig of table: {}", dataDir, tierName,
+                tableNameWithType);
+            return dataDir;
+          } else {
+            LOGGER.debug("No dataDir for tier: {} in TableConfig of table: {}", tierName, tableNameWithType);
+          }
         }
       }
     }
@@ -128,8 +129,6 @@ public final class TierConfigUtils {
       // All instance config names are lower cased while being passed down here.
       dataDir = instanceCfgs.get(CommonConstants.Tier.BACKEND_PROP_DATA_DIR.toLowerCase());
     }
-    Preconditions.checkState(StringUtils.isNotEmpty(dataDir), "No dataDir for tier: %s for table: %s", tierName,
-        tableNameWithType);
     LOGGER.debug("Got dataDir: {} for tier: {} for table: {} in instance configs", dataDir, tierName,
         tableNameWithType);
     return dataDir;
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/tier/TierConfigUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/tier/TierConfigUtilsTest.java
index 2b942a1cab..75200fcb4e 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/tier/TierConfigUtilsTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/tier/TierConfigUtilsTest.java
@@ -192,30 +192,21 @@ public class TierConfigUtilsTest {
   @Test
   public void testGetDataDirForTier() {
     TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build();
-    try {
-      TierConfigUtils.getDataDirForTier(tableConfig, "tier1");
-    } catch (Exception e) {
-      Assert.assertEquals(e.getMessage(), "No dataDir for tier: tier1 for table: myTable_OFFLINE");
-    }
+    String dataDir = TierConfigUtils.getDataDirForTier(tableConfig, "tier1");
+    Assert.assertNull(dataDir);
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTierConfigList(Lists
         .newArrayList(new TierConfig("myTier", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "10d", null,
             TierFactory.PINOT_SERVER_STORAGE_TYPE, "tag_OFFLINE", null, null))).build();
-    try {
-      TierConfigUtils.getDataDirForTier(tableConfig, "tier1");
-    } catch (Exception e) {
-      Assert.assertEquals(e.getMessage(), "No dataDir for tier: tier1 for table: myTable_OFFLINE");
-    }
-    try {
-      TierConfigUtils.getDataDirForTier(tableConfig, "myTier");
-    } catch (Exception e) {
-      Assert.assertEquals(e.getMessage(), "No dataDir for tier: myTier for table: myTable_OFFLINE");
-    }
+    dataDir = TierConfigUtils.getDataDirForTier(tableConfig, "tier1");
+    Assert.assertNull(dataDir);
+    dataDir = TierConfigUtils.getDataDirForTier(tableConfig, "myTier");
+    Assert.assertNull(dataDir);
     // Provide instance tierConfigs for the tier.
     Map<String, Map<String, String>> instanceTierConfigs = new HashMap<>();
     Map<String, String> tierCfgMap = new HashMap<>();
     tierCfgMap.put("datadir", "/abc/xyz");
     instanceTierConfigs.put("myTier", tierCfgMap);
-    String dataDir = TierConfigUtils.getDataDirForTier(tableConfig, "myTier", instanceTierConfigs);
+    dataDir = TierConfigUtils.getDataDirForTier(tableConfig, "myTier", instanceTierConfigs);
     Assert.assertEquals(dataDir, "/abc/xyz");
     // Table tierConfigs overwrite those from instance tierConfigs.
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTierConfigList(Lists
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 8595a434af..e519f2b8f8 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -242,7 +242,6 @@ public class ControllerConf extends PinotConfiguration {
 
     private static final int DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60;
     private static final int DEFAULT_SEGMENT_RELOCATOR_FREQUENCY_IN_SECONDS = 60 * 60;
-    private static final int DEFAULT_SEGMENT_TIER_ASSIGNER_FREQUENCY_IN_SECONDS = -1; // Disabled
 
     // Realtime Consumer Monitor
     private static final String RT_CONSUMER_MONITOR_FREQUENCY_PERIOD =
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 9bc2c905bb..882af138d4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -41,6 +41,7 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -685,16 +686,13 @@ public abstract class BaseTableDataManager implements TableDataManager {
     if (segmentTier == null) {
       return getSegmentDataDir(segmentName);
     }
-    try {
-      String tierDataDir =
-          TierConfigUtils.getDataDirForTier(tableConfig, segmentTier, _tableDataManagerConfig.getInstanceTierConfigs());
-      File tierTableDataDir = new File(tierDataDir, _tableNameWithType);
-      return new File(tierTableDataDir, segmentName);
-    } catch (Exception e) {
-      LOGGER.warn("Failed to get dataDir for segment: {} of table: {} on tier: {} due to error: {}", segmentName,
-          _tableNameWithType, segmentTier, e.getMessage());
+    String tierDataDir =
+        TierConfigUtils.getDataDirForTier(tableConfig, segmentTier, _tableDataManagerConfig.getInstanceTierConfigs());
+    if (StringUtils.isEmpty(tierDataDir)) {
       return getSegmentDataDir(segmentName);
     }
+    File tierTableDataDir = new File(tierDataDir, _tableNameWithType);
+    return new File(tierTableDataDir, segmentName);
   }
 
   @Nullable
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/TierBasedSegmentDirectoryLoader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/TierBasedSegmentDirectoryLoader.java
index e23e73df46..397aa79c56 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/TierBasedSegmentDirectoryLoader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/TierBasedSegmentDirectoryLoader.java
@@ -206,16 +206,14 @@ public class TierBasedSegmentDirectoryLoader implements SegmentDirectoryLoader {
     TableConfig tableConfig = loaderContext.getTableConfig();
     String tableNameWithType = tableConfig.getTableName();
     String segmentName = loaderContext.getSegmentName();
-    try {
-      String tierDataDir =
-          TierConfigUtils.getDataDirForTier(tableConfig, segmentTier, loaderContext.getInstanceTierConfigs());
-      File tierTableDataDir = new File(tierDataDir, tableNameWithType);
-      return new File(tierTableDataDir, segmentName);
-    } catch (Exception e) {
-      LOGGER.warn("Failed to get dataDir for segment: {} of table: {} on tier: {} due to error: {}", segmentName,
-          tableNameWithType, segmentTier, e.getMessage());
+    String tierDataDir =
+        TierConfigUtils.getDataDirForTier(tableConfig, segmentTier, loaderContext.getInstanceTierConfigs());
+    if (StringUtils.isEmpty(tierDataDir)) {
+      LOGGER.warn("No dataDir for segment: {} of table: {} on tier: {}", segmentName, tableNameWithType, segmentTier);
       return null;
     }
+    File tierTableDataDir = new File(tierDataDir, tableNameWithType);
+    return new File(tierTableDataDir, segmentName);
   }
 
   @Override
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 6bfcf09952..92a82f133b 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -97,6 +98,7 @@ public class HelixInstanceDataManager implements InstanceDataManager {
   // Fixed size LRU cache for storing last N errors on the instance.
   // Key is TableNameWithType-SegmentName pair.
   private LoadingCache<Pair<String, String>, SegmentErrorInfo> _errorCache;
+  private ExecutorService _segmentRefreshExecutor;
 
   @Override
   public void setSupplierOfIsServerReadyToServeQueries(Supplier<Boolean> isServingQueries) {
@@ -129,7 +131,10 @@ public class HelixInstanceDataManager implements InstanceDataManager {
 
     // Initialize segment build time lease extender executor
     SegmentBuildTimeLeaseExtender.initExecutor();
-
+    // Initialize a fixed thread pool to reload/refresh segments in parallel. The getMaxParallelRefreshThreads() is
+    // used to initialize a segment refresh semaphore to limit the parallelism, so create a pool of same size.
+    _segmentRefreshExecutor = Executors.newFixedThreadPool(getMaxParallelRefreshThreads(),
+        new ThreadFactoryBuilder().setNameFormat("segment-refresh-thread-%d").build());
     // Initialize the table data manager provider
     TableDataManagerProvider.init(_instanceDataManagerConfig);
     LOGGER.info("Initialized Helix instance data manager");
@@ -184,6 +189,7 @@ public class HelixInstanceDataManager implements InstanceDataManager {
 
   @Override
   public synchronized void shutDown() {
+    _segmentRefreshExecutor.shutdownNow();
     for (TableDataManager tableDataManager : _tableDataManagerMap.values()) {
       tableDataManager.shutDown();
     }
@@ -366,7 +372,6 @@ public class HelixInstanceDataManager implements InstanceDataManager {
     Preconditions.checkNotNull(tableConfig);
     Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, tableNameWithType);
     List<String> failedSegments = new ArrayList<>();
-    ExecutorService workers = Executors.newCachedThreadPool();
     final AtomicReference<Exception> sampleException = new AtomicReference<>();
     //calling thread hasn't acquired any permit so we don't reload any segments using it.
     CompletableFuture.allOf(segmentsMetadata.stream().map(segmentMetadata -> CompletableFuture.runAsync(() -> {
@@ -383,10 +388,7 @@ public class HelixInstanceDataManager implements InstanceDataManager {
         failedSegments.add(segmentName);
         sampleException.set(e);
       }
-    }, workers)).toArray(CompletableFuture[]::new)).get();
-
-    workers.shutdownNow();
-
+    }, _segmentRefreshExecutor)).toArray(CompletableFuture[]::new)).get();
     if (sampleException.get() != null) {
       throw new RuntimeException(
           String.format("Failed to reload %d/%d segments: %s in table: %s", failedSegments.size(),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org