You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/06/07 20:03:28 UTC
[pinot] branch master updated: refine segment reload executor to avoid creating threads unbounded (#10837)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4b0d0d86a9 refine segment reload executor to avoid creating threads unbounded (#10837)
4b0d0d86a9 is described below
commit 4b0d0d86a9f6f29b4da92d320f52fc0880dabdfd
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Wed Jun 7 13:03:18 2023 -0700
refine segment reload executor to avoid creating threads unbounded (#10837)
---
.../pinot/common/utils/config/TierConfigUtils.java | 31 +++++++++++-----------
.../pinot/common/tier/TierConfigUtilsTest.java | 23 +++++-----------
.../apache/pinot/controller/ControllerConf.java | 1 -
.../core/data/manager/BaseTableDataManager.java | 14 +++++-----
.../loader/TierBasedSegmentDirectoryLoader.java | 14 +++++-----
.../starter/helix/HelixInstanceDataManager.java | 14 +++++-----
6 files changed, 42 insertions(+), 55 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java
index d6c8eccf60..31e32113e7 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.common.utils.config;
-import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -63,10 +62,6 @@ public final class TierConfigUtils {
return tierName == null ? "default" : tierName;
}
- public static String getDataDirForTier(TableConfig tableConfig, String tierName) {
- return getDataDirForTier(tableConfig, tierName, Collections.emptyMap());
- }
-
/**
* Consider configured tiers and compute default instance partitions for the segment
*
@@ -93,6 +88,12 @@ public final class TierConfigUtils {
return null;
}
+ @Nullable
+ public static String getDataDirForTier(TableConfig tableConfig, String tierName) {
+ return getDataDirForTier(tableConfig, tierName, Collections.emptyMap());
+ }
+
+ @Nullable
public static String getDataDirForTier(TableConfig tableConfig, String tierName,
Map<String, Map<String, String>> instanceTierConfigs) {
String tableNameWithType = tableConfig.getTableName();
@@ -108,17 +109,17 @@ public final class TierConfigUtils {
}
if (tierCfg != null) {
Map<String, String> backendProps = tierCfg.getTierBackendProperties();
- if (backendProps != null) {
- dataDir = backendProps.get(CommonConstants.Tier.BACKEND_PROP_DATA_DIR);
- } else {
+ if (backendProps == null) {
LOGGER.debug("No backend props for tier: {} in TableConfig of table: {}", tierName, tableNameWithType);
- }
- if (StringUtils.isNotEmpty(dataDir)) {
- LOGGER.debug("Got dataDir: {} for tier: {} in TableConfig of table: {}", dataDir, tierName,
- tableNameWithType);
- return dataDir;
} else {
- LOGGER.debug("No dataDir for tier: {} in TableConfig of table: {}", tierName, tableNameWithType);
+ dataDir = backendProps.get(CommonConstants.Tier.BACKEND_PROP_DATA_DIR);
+ if (StringUtils.isNotEmpty(dataDir)) {
+ LOGGER.debug("Got dataDir: {} for tier: {} in TableConfig of table: {}", dataDir, tierName,
+ tableNameWithType);
+ return dataDir;
+ } else {
+ LOGGER.debug("No dataDir for tier: {} in TableConfig of table: {}", tierName, tableNameWithType);
+ }
}
}
}
@@ -128,8 +129,6 @@ public final class TierConfigUtils {
// All instance config names are lower cased while being passed down here.
dataDir = instanceCfgs.get(CommonConstants.Tier.BACKEND_PROP_DATA_DIR.toLowerCase());
}
- Preconditions.checkState(StringUtils.isNotEmpty(dataDir), "No dataDir for tier: %s for table: %s", tierName,
- tableNameWithType);
LOGGER.debug("Got dataDir: {} for tier: {} for table: {} in instance configs", dataDir, tierName,
tableNameWithType);
return dataDir;
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/tier/TierConfigUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/tier/TierConfigUtilsTest.java
index 2b942a1cab..75200fcb4e 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/tier/TierConfigUtilsTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/tier/TierConfigUtilsTest.java
@@ -192,30 +192,21 @@ public class TierConfigUtilsTest {
@Test
public void testGetDataDirForTier() {
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build();
- try {
- TierConfigUtils.getDataDirForTier(tableConfig, "tier1");
- } catch (Exception e) {
- Assert.assertEquals(e.getMessage(), "No dataDir for tier: tier1 for table: myTable_OFFLINE");
- }
+ String dataDir = TierConfigUtils.getDataDirForTier(tableConfig, "tier1");
+ Assert.assertNull(dataDir);
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTierConfigList(Lists
.newArrayList(new TierConfig("myTier", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "10d", null,
TierFactory.PINOT_SERVER_STORAGE_TYPE, "tag_OFFLINE", null, null))).build();
- try {
- TierConfigUtils.getDataDirForTier(tableConfig, "tier1");
- } catch (Exception e) {
- Assert.assertEquals(e.getMessage(), "No dataDir for tier: tier1 for table: myTable_OFFLINE");
- }
- try {
- TierConfigUtils.getDataDirForTier(tableConfig, "myTier");
- } catch (Exception e) {
- Assert.assertEquals(e.getMessage(), "No dataDir for tier: myTier for table: myTable_OFFLINE");
- }
+ dataDir = TierConfigUtils.getDataDirForTier(tableConfig, "tier1");
+ Assert.assertNull(dataDir);
+ dataDir = TierConfigUtils.getDataDirForTier(tableConfig, "myTier");
+ Assert.assertNull(dataDir);
// Provide instance tierConfigs for the tier.
Map<String, Map<String, String>> instanceTierConfigs = new HashMap<>();
Map<String, String> tierCfgMap = new HashMap<>();
tierCfgMap.put("datadir", "/abc/xyz");
instanceTierConfigs.put("myTier", tierCfgMap);
- String dataDir = TierConfigUtils.getDataDirForTier(tableConfig, "myTier", instanceTierConfigs);
+ dataDir = TierConfigUtils.getDataDirForTier(tableConfig, "myTier", instanceTierConfigs);
Assert.assertEquals(dataDir, "/abc/xyz");
// Table tierConfigs overwrite those from instance tierConfigs.
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTierConfigList(Lists
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 8595a434af..e519f2b8f8 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -242,7 +242,6 @@ public class ControllerConf extends PinotConfiguration {
private static final int DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60;
private static final int DEFAULT_SEGMENT_RELOCATOR_FREQUENCY_IN_SECONDS = 60 * 60;
- private static final int DEFAULT_SEGMENT_TIER_ASSIGNER_FREQUENCY_IN_SECONDS = -1; // Disabled
// Realtime Consumer Monitor
private static final String RT_CONSUMER_MONITOR_FREQUENCY_PERIOD =
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 9bc2c905bb..882af138d4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -41,6 +41,7 @@ import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -685,16 +686,13 @@ public abstract class BaseTableDataManager implements TableDataManager {
if (segmentTier == null) {
return getSegmentDataDir(segmentName);
}
- try {
- String tierDataDir =
- TierConfigUtils.getDataDirForTier(tableConfig, segmentTier, _tableDataManagerConfig.getInstanceTierConfigs());
- File tierTableDataDir = new File(tierDataDir, _tableNameWithType);
- return new File(tierTableDataDir, segmentName);
- } catch (Exception e) {
- LOGGER.warn("Failed to get dataDir for segment: {} of table: {} on tier: {} due to error: {}", segmentName,
- _tableNameWithType, segmentTier, e.getMessage());
+ String tierDataDir =
+ TierConfigUtils.getDataDirForTier(tableConfig, segmentTier, _tableDataManagerConfig.getInstanceTierConfigs());
+ if (StringUtils.isEmpty(tierDataDir)) {
return getSegmentDataDir(segmentName);
}
+ File tierTableDataDir = new File(tierDataDir, _tableNameWithType);
+ return new File(tierTableDataDir, segmentName);
}
@Nullable
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/TierBasedSegmentDirectoryLoader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/TierBasedSegmentDirectoryLoader.java
index e23e73df46..397aa79c56 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/TierBasedSegmentDirectoryLoader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/TierBasedSegmentDirectoryLoader.java
@@ -206,16 +206,14 @@ public class TierBasedSegmentDirectoryLoader implements SegmentDirectoryLoader {
TableConfig tableConfig = loaderContext.getTableConfig();
String tableNameWithType = tableConfig.getTableName();
String segmentName = loaderContext.getSegmentName();
- try {
- String tierDataDir =
- TierConfigUtils.getDataDirForTier(tableConfig, segmentTier, loaderContext.getInstanceTierConfigs());
- File tierTableDataDir = new File(tierDataDir, tableNameWithType);
- return new File(tierTableDataDir, segmentName);
- } catch (Exception e) {
- LOGGER.warn("Failed to get dataDir for segment: {} of table: {} on tier: {} due to error: {}", segmentName,
- tableNameWithType, segmentTier, e.getMessage());
+ String tierDataDir =
+ TierConfigUtils.getDataDirForTier(tableConfig, segmentTier, loaderContext.getInstanceTierConfigs());
+ if (StringUtils.isEmpty(tierDataDir)) {
+ LOGGER.warn("No dataDir for segment: {} of table: {} on tier: {}", segmentName, tableNameWithType, segmentTier);
return null;
}
+ File tierTableDataDir = new File(tierDataDir, tableNameWithType);
+ return new File(tierTableDataDir, segmentName);
}
@Override
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 6bfcf09952..92a82f133b 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -97,6 +98,7 @@ public class HelixInstanceDataManager implements InstanceDataManager {
// Fixed size LRU cache for storing last N errors on the instance.
// Key is TableNameWithType-SegmentName pair.
private LoadingCache<Pair<String, String>, SegmentErrorInfo> _errorCache;
+ private ExecutorService _segmentRefreshExecutor;
@Override
public void setSupplierOfIsServerReadyToServeQueries(Supplier<Boolean> isServingQueries) {
@@ -129,7 +131,10 @@ public class HelixInstanceDataManager implements InstanceDataManager {
// Initialize segment build time lease extender executor
SegmentBuildTimeLeaseExtender.initExecutor();
-
+ // Initialize a fixed thread pool to reload/refresh segments in parallel. The getMaxParallelRefreshThreads() is
+ // used to initialize a segment refresh semaphore to limit the parallelism, so create a pool of same size.
+ _segmentRefreshExecutor = Executors.newFixedThreadPool(getMaxParallelRefreshThreads(),
+ new ThreadFactoryBuilder().setNameFormat("segment-refresh-thread-%d").build());
// Initialize the table data manager provider
TableDataManagerProvider.init(_instanceDataManagerConfig);
LOGGER.info("Initialized Helix instance data manager");
@@ -184,6 +189,7 @@ public class HelixInstanceDataManager implements InstanceDataManager {
@Override
public synchronized void shutDown() {
+ _segmentRefreshExecutor.shutdownNow();
for (TableDataManager tableDataManager : _tableDataManagerMap.values()) {
tableDataManager.shutDown();
}
@@ -366,7 +372,6 @@ public class HelixInstanceDataManager implements InstanceDataManager {
Preconditions.checkNotNull(tableConfig);
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, tableNameWithType);
List<String> failedSegments = new ArrayList<>();
- ExecutorService workers = Executors.newCachedThreadPool();
final AtomicReference<Exception> sampleException = new AtomicReference<>();
//calling thread hasn't acquired any permit so we don't reload any segments using it.
CompletableFuture.allOf(segmentsMetadata.stream().map(segmentMetadata -> CompletableFuture.runAsync(() -> {
@@ -383,10 +388,7 @@ public class HelixInstanceDataManager implements InstanceDataManager {
failedSegments.add(segmentName);
sampleException.set(e);
}
- }, workers)).toArray(CompletableFuture[]::new)).get();
-
- workers.shutdownNow();
-
+ }, _segmentRefreshExecutor)).toArray(CompletableFuture[]::new)).get();
if (sampleException.get() != null) {
throw new RuntimeException(
String.format("Failed to reload %d/%d segments: %s in table: %s", failedSegments.size(),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org