You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/04/18 21:10:00 UTC
[incubator-pinot] branch master updated: Remove singleton for
PinotLLCRealtimeSegmentManager and SegmentCompletionManager (#4102)
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 25c3686 Remove singleton for PinotLLCRealtimeSegmentManager and SegmentCompletionManager (#4102)
25c3686 is described below
commit 25c36869999ad12560ca3fa0a9c1319cfba5183c
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Thu Apr 18 14:09:56 2019 -0700
Remove singleton for PinotLLCRealtimeSegmentManager and SegmentCompletionManager (#4102)
---
.../apache/pinot/controller/ControllerStarter.java | 31 ++++++---
.../resources/LLCSegmentCompletionHandlers.java | 19 +++---
.../helix/core/PinotHelixResourceManager.java | 23 ++++---
.../helix/core/PinotTableIdealStateBuilder.java | 8 +--
.../realtime/PinotLLCRealtimeSegmentManager.java | 73 +++++++---------------
.../core/realtime/SegmentCompletionManager.java | 33 ++--------
.../rebalance/RebalanceSegmentStrategyFactory.java | 27 +-------
.../controller/helix/PinotControllerModeTest.java | 11 ++++
.../PinotLLCRealtimeSegmentManagerTest.java | 61 +++++++++++-------
.../helix/core/realtime/SegmentCompletionTest.java | 31 ++++-----
10 files changed, 147 insertions(+), 170 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index 4a900c6..fd3169c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -56,6 +56,7 @@ import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTaskScheduler;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.realtime.PinotRealtimeSegmentManager;
+import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategyFactory;
import org.apache.pinot.controller.helix.core.relocation.RealtimeSegmentRelocator;
import org.apache.pinot.controller.helix.core.retention.RetentionManager;
@@ -108,6 +109,8 @@ public class ControllerStarter {
private ControllerPeriodicTaskScheduler _controllerPeriodicTaskScheduler;
private PinotHelixTaskResourceManager _helixTaskResourceManager;
private PinotRealtimeSegmentManager _realtimeSegmentsManager;
+ private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
+ private SegmentCompletionManager _segmentCompletionManager;
private ControllerLeadershipManager _controllerLeadershipManager;
private List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbackList;
@@ -259,8 +262,16 @@ public class ControllerStarter {
// Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager
LOGGER.info("Starting realtime segment manager");
- PinotLLCRealtimeSegmentManager
- .create(_helixResourceManager, _config, _controllerMetrics, _controllerLeadershipManager);
+
+ _pinotLLCRealtimeSegmentManager =
+ new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics,
+ _controllerLeadershipManager);
+ // TODO: Need to put this inside HelixResourceManager when ControllerLeadershipManager is removed.
+ _helixResourceManager.registerPinotLLCRealtimeSegmentManager(_pinotLLCRealtimeSegmentManager);
+ _segmentCompletionManager =
+ new SegmentCompletionManager(helixParticipantManager, _pinotLLCRealtimeSegmentManager, _controllerMetrics,
+ _controllerLeadershipManager, _config.getSegmentCommitTimeoutSeconds());
+
_realtimeSegmentsManager = new PinotRealtimeSegmentManager(_helixResourceManager, _controllerLeadershipManager);
_realtimeSegmentsManager.start(_controllerMetrics);
@@ -270,8 +281,9 @@ public class ControllerStarter {
_controllerPeriodicTaskScheduler = new ControllerPeriodicTaskScheduler();
_controllerPeriodicTaskScheduler.init(controllerPeriodicTasks, _controllerLeadershipManager);
- LOGGER.info("Creating rebalance segments factory");
- RebalanceSegmentStrategyFactory.createInstance(helixParticipantManager);
+ LOGGER.info("Registering rebalance segments factory");
+ _helixResourceManager
+ .registerRebalanceSegmentStrategyFactory(new RebalanceSegmentStrategyFactory(helixParticipantManager));
String accessControlFactoryClass = _config.getAccessControlFactoryClass();
LOGGER.info("Use class: {} as the AccessControlFactory", accessControlFactoryClass);
@@ -298,6 +310,7 @@ public class ControllerStarter {
bind(_config).to(ControllerConf.class);
bind(_helixResourceManager).to(PinotHelixResourceManager.class);
bind(_helixTaskResourceManager).to(PinotHelixTaskResourceManager.class);
+ bind(_segmentCompletionManager).to(SegmentCompletionManager.class);
bind(_taskManager).to(PinotTaskManager.class);
bind(connectionManager).to(HttpConnectionManager.class);
bind(_executorService).to(Executor.class);
@@ -418,8 +431,9 @@ public class ControllerStarter {
new OfflineSegmentIntervalChecker(_config, _helixResourceManager, new ValidationMetrics(_metricsRegistry),
_controllerMetrics);
periodicTasks.add(_offlineSegmentIntervalChecker);
- _realtimeSegmentValidationManager = new RealtimeSegmentValidationManager(_config, _helixResourceManager,
- PinotLLCRealtimeSegmentManager.getInstance(), new ValidationMetrics(_metricsRegistry), _controllerMetrics);
+ _realtimeSegmentValidationManager =
+ new RealtimeSegmentValidationManager(_config, _helixResourceManager, _pinotLLCRealtimeSegmentManager,
+ new ValidationMetrics(_metricsRegistry), _controllerMetrics);
periodicTasks.add(_realtimeSegmentValidationManager);
_brokerResourceValidationManager =
new BrokerResourceValidationManager(_config, _helixResourceManager, _controllerMetrics);
@@ -460,7 +474,7 @@ public class ControllerStarter {
// Stop PinotLLCSegmentManager before stopping Jersey API. It is possible that stopping Jersey API
// may interrupt the handlers waiting on an I/O.
- PinotLLCRealtimeSegmentManager.getInstance().stop();
+ _pinotLLCRealtimeSegmentManager.stop();
LOGGER.info("Closing PinotFS classes");
PinotFSFactory.shutdown();
@@ -474,9 +488,6 @@ public class ControllerStarter {
LOGGER.info("Stopping resource manager");
_helixResourceManager.stop();
- LOGGER.info("Stopping rebalance segments factory");
- RebalanceSegmentStrategyFactory.stop();
-
_executorService.shutdownNow();
} catch (final Exception e) {
LOGGER.error("Caught exception while shutting down", e);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index 19e2123..bbd70b3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -75,6 +75,9 @@ public class LLCSegmentCompletionHandlers {
@Inject
ControllerConf _controllerConf;
+ @Inject
+ SegmentCompletionManager _segmentCompletionManager;
+
@VisibleForTesting
public static String getScheme() {
return SCHEME;
@@ -104,7 +107,7 @@ public class LLCSegmentCompletionHandlers {
.withExtraTimeSec(extraTimeSec);
LOGGER.info("Processing extendBuildTime:{}", requestParams.toString());
- SegmentCompletionProtocol.Response response = SegmentCompletionManager.getInstance().extendBuildTime(requestParams);
+ SegmentCompletionProtocol.Response response = _segmentCompletionManager.extendBuildTime(requestParams);
final String responseStr = response.toJsonString();
LOGGER.info("Response to extendBuildTime:{}", responseStr);
@@ -130,7 +133,7 @@ public class LLCSegmentCompletionHandlers {
.withMemoryUsedBytes(memoryUsedBytes).withNumRows(numRows);
LOGGER.info("Processing segmentConsumed:{}", requestParams.toString());
- SegmentCompletionProtocol.Response response = SegmentCompletionManager.getInstance().segmentConsumed(requestParams);
+ SegmentCompletionProtocol.Response response = _segmentCompletionManager.segmentConsumed(requestParams);
final String responseStr = response.toJsonString();
LOGGER.info("Response to segmentConsumed:{}", responseStr);
return responseStr;
@@ -153,7 +156,7 @@ public class LLCSegmentCompletionHandlers {
LOGGER.info("Processing segmentStoppedConsuming:{}", requestParams.toString());
SegmentCompletionProtocol.Response response =
- SegmentCompletionManager.getInstance().segmentStoppedConsuming(requestParams);
+ _segmentCompletionManager.segmentStoppedConsuming(requestParams);
final String responseStr = response.toJsonString();
LOGGER.info("Response to segmentStoppedConsuming:{}", responseStr);
return responseStr;
@@ -183,7 +186,7 @@ public class LLCSegmentCompletionHandlers {
LOGGER.info("Processing segmentCommitStart:{}", requestParams.toString());
SegmentCompletionProtocol.Response response =
- SegmentCompletionManager.getInstance().segmentCommitStart(requestParams);
+ _segmentCompletionManager.segmentCommitStart(requestParams);
final String responseStr = response.toJsonString();
LOGGER.info("Response to segmentCommitStart:{}", responseStr);
return responseStr;
@@ -230,7 +233,7 @@ public class LLCSegmentCompletionHandlers {
CommittingSegmentDescriptor committingSegmentDescriptor =
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams, segmentMetadata);
- SegmentCompletionProtocol.Response response = SegmentCompletionManager.getInstance()
+ SegmentCompletionProtocol.Response response = _segmentCompletionManager
.segmentCommitEnd(requestParams, isSuccess, isSplitCommit, committingSegmentDescriptor);
final String responseStr = response.toJsonString();
LOGGER.info("Response to segmentCommitEnd:{}", responseStr);
@@ -255,7 +258,7 @@ public class LLCSegmentCompletionHandlers {
.withNumRows(numRows).withMemoryUsedBytes(memoryUsedBytes);
LOGGER.info("Processing segmentCommit:{}", requestParams.toString());
- final SegmentCompletionManager segmentCompletionManager = SegmentCompletionManager.getInstance();
+ final SegmentCompletionManager segmentCompletionManager = _segmentCompletionManager;
SegmentCompletionProtocol.Response response = segmentCompletionManager.segmentCommitStart(requestParams);
CommittingSegmentDescriptor committingSegmentDescriptor =
@@ -297,7 +300,7 @@ public class LLCSegmentCompletionHandlers {
// check for existing segment file and remove it. So, the block cannot be removed altogether.
// For now, we live with these corner cases. Once we have split-commit enabled and working, this code will no longer
// be used.
- synchronized (SegmentCompletionManager.getInstance()) {
+ synchronized (_segmentCompletionManager) {
if (pinotFS.exists(segmentFileURI)) {
LOGGER.warn("Segment file {} exists. Replacing with upload from {} for segment {}",
segmentFileURI.toString(), instanceId, segmentName);
@@ -406,7 +409,7 @@ public class LLCSegmentCompletionHandlers {
LOGGER.error("Segment metadata extraction failure for segment {}", segmentName);
return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
}
- SegmentCompletionProtocol.Response response = SegmentCompletionManager.getInstance()
+ SegmentCompletionProtocol.Response response = _segmentCompletionManager
.segmentCommitEnd(requestParams, isSuccess, isSplitCommit,
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams, segmentMetadata));
final String responseStr = response.toJsonString();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index a13fab3..ef94094 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -138,6 +138,8 @@ public class PinotHelixResourceManager {
private ZkCacheBaseDataAccessor<ZNRecord> _cacheInstanceConfigsDataAccessor;
private Builder _keyBuilder;
private SegmentDeletionManager _segmentDeletionManager;
+ private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
+ private RebalanceSegmentStrategyFactory _rebalanceSegmentStrategyFactory;
private TableRebalancer _tableRebalancer;
public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String helixClusterName,
@@ -1131,13 +1133,11 @@ public class PinotHelixResourceManager {
@VisibleForTesting
protected void validateTableTenantConfig(TableConfig tableConfig, String tableNameWithType, TableType tableType) {
if (tableConfig == null) {
- throw new InvalidTableConfigException(
- "Table config is null for table: " + tableNameWithType);
+ throw new InvalidTableConfigException("Table config is null for table: " + tableNameWithType);
}
TenantConfig tenantConfig = tableConfig.getTenantConfig();
if (tenantConfig == null || tenantConfig.getBroker() == null || tenantConfig.getServer() == null) {
- throw new InvalidTableConfigException(
- "Tenant is not configured for table: " + tableNameWithType);
+ throw new InvalidTableConfigException("Tenant is not configured for table: " + tableNameWithType);
}
// Check if tenant exists before creating the table
String brokerTenantName = TagNameUtils.getBrokerTagForTenant(tenantConfig.getBroker());
@@ -1236,6 +1236,14 @@ public class PinotHelixResourceManager {
}
}
+ public void registerPinotLLCRealtimeSegmentManager(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager) {
+ _pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
+ }
+
+ public void registerRebalanceSegmentStrategyFactory(RebalanceSegmentStrategyFactory rebalanceSegmentStrategyFactory) {
+ _rebalanceSegmentStrategyFactory = rebalanceSegmentStrategyFactory;
+ }
+
private void ensureRealtimeClusterIsSetUp(TableConfig config, String realtimeTableName,
IndexingConfig indexingConfig) {
StreamConfig streamConfig = new StreamConfig(indexingConfig.getStreamConfigs());
@@ -1255,7 +1263,7 @@ public class PinotHelixResourceManager {
// Only high-level consumer specified in the config.
createHelixEntriesForHighLevelConsumer(config, realtimeTableName, idealState);
// Clean up any LLC table if they are present
- PinotLLCRealtimeSegmentManager.getInstance().cleanupLLC(realtimeTableName);
+ _pinotLLCRealtimeSegmentManager.cleanupLLC(realtimeTableName);
}
}
@@ -1265,7 +1273,8 @@ public class PinotHelixResourceManager {
// (unless there are low-level segments already present)
if (ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, realtimeTableName).isEmpty()) {
PinotTableIdealStateBuilder
- .buildLowLevelRealtimeIdealStateFor(realtimeTableName, config, idealState, _enableBatchMessageMode);
+ .buildLowLevelRealtimeIdealStateFor(_pinotLLCRealtimeSegmentManager, realtimeTableName, config, idealState,
+ _enableBatchMessageMode);
LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName);
} else {
LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName);
@@ -2234,7 +2243,7 @@ public class PinotHelixResourceManager {
RebalanceResult result;
try {
RebalanceSegmentStrategy rebalanceSegmentsStrategy =
- RebalanceSegmentStrategyFactory.getInstance().getRebalanceSegmentsStrategy(tableConfig);
+ _rebalanceSegmentStrategyFactory.getRebalanceSegmentsStrategy(tableConfig);
result = _tableRebalancer.rebalance(tableConfig, rebalanceSegmentsStrategy, rebalanceUserConfig);
} catch (InvalidConfigException e) {
LOGGER.error("Exception in rebalancing config for table {}", tableNameWithType, e);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 0e311e6..46eb8e4 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -118,8 +118,9 @@ public class PinotTableIdealStateBuilder {
return idealState;
}
- public static void buildLowLevelRealtimeIdealStateFor(String realtimeTableName, TableConfig realtimeTableConfig,
- IdealState idealState, boolean enableBatchMessageMode) {
+ public static void buildLowLevelRealtimeIdealStateFor(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager,
+ String realtimeTableName, TableConfig realtimeTableConfig, IdealState idealState,
+ boolean enableBatchMessageMode) {
// Validate replicasPerPartition here.
final String replicasPerPartitionStr = realtimeTableConfig.getValidationConfig().getReplicasPerPartition();
@@ -136,9 +137,8 @@ public class PinotTableIdealStateBuilder {
if (idealState == null) {
idealState = buildEmptyRealtimeIdealStateFor(realtimeTableName, nReplicas, enableBatchMessageMode);
}
- final PinotLLCRealtimeSegmentManager segmentManager = PinotLLCRealtimeSegmentManager.getInstance();
try {
- segmentManager.setupNewTable(realtimeTableConfig, idealState);
+ pinotLLCRealtimeSegmentManager.setupNewTable(realtimeTableConfig, idealState);
} catch (InvalidConfigException e) {
throw new IllegalStateException("Caught exception when creating table " + realtimeTableName, e);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 324b9ae..c833261 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -111,9 +111,6 @@ public class PinotLLCRealtimeSegmentManager {
*/
private static int MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000; // 5 MINUTES
- // TODO: fix the misuse of singleton.
- private static PinotLLCRealtimeSegmentManager INSTANCE = null;
-
private final HelixAdmin _helixAdmin;
private final HelixManager _helixManager;
private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
@@ -131,6 +128,27 @@ public class PinotLLCRealtimeSegmentManager {
private volatile boolean _isStopping = false;
private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
+ public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
+ ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
+ _helixAdmin = helixResourceManager.getHelixAdmin();
+ _helixManager = helixResourceManager.getHelixZkManager();
+ _propertyStore = helixResourceManager.getPropertyStore();
+ _helixResourceManager = helixResourceManager;
+ _clusterName = helixResourceManager.getHelixClusterName();
+ _controllerConf = controllerConf;
+ _controllerMetrics = controllerMetrics;
+ _numIdealStateUpdateLocks = controllerConf.getRealtimeSegmentMetadataCommitNumLocks();
+ _idealstateUpdateLocks = new Lock[_numIdealStateUpdateLocks];
+ for (int i = 0; i < _numIdealStateUpdateLocks; i++) {
+ _idealstateUpdateLocks[i] = new ReentrantLock();
+ }
+ _tableConfigCache = new TableConfigCache(_propertyStore);
+ _streamPartitionAssignmentGenerator = new StreamPartitionAssignmentGenerator(_helixManager);
+ _flushThresholdUpdateManager = new FlushThresholdUpdateManager();
+ _controllerLeadershipManager = controllerLeadershipManager;
+ }
+
+
public boolean getIsSplitCommitEnabled() {
return _controllerConf.getAcceptSplitCommit();
}
@@ -139,25 +157,6 @@ public class PinotLLCRealtimeSegmentManager {
return _controllerConf.generateVipUrl();
}
- public static synchronized void create(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
- ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
- create(helixResourceManager.getHelixAdmin(), helixResourceManager.getHelixClusterName(),
- helixResourceManager.getHelixZkManager(), helixResourceManager.getPropertyStore(), helixResourceManager,
- controllerConf, controllerMetrics, controllerLeadershipManager);
- }
-
- private static synchronized void create(HelixAdmin helixAdmin, String clusterName, HelixManager helixManager,
- ZkHelixPropertyStore propertyStore, PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
- ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
- if (INSTANCE != null) {
- throw new RuntimeException("Instance already created");
- }
- INSTANCE =
- new PinotLLCRealtimeSegmentManager(helixAdmin, clusterName, helixManager, propertyStore, helixResourceManager,
- controllerConf, controllerMetrics, controllerLeadershipManager);
- SegmentCompletionManager.create(helixManager, INSTANCE, controllerConf, controllerMetrics, controllerLeadershipManager);
- }
-
public void stop() {
_isStopping = true;
LOGGER
@@ -181,36 +180,6 @@ public class PinotLLCRealtimeSegmentManager {
}
}
LOGGER.info("Wait completed: Number of completing segments = {}", _numCompletingSegments.get());
- INSTANCE = null;
- SegmentCompletionManager.stop();
- }
-
- protected PinotLLCRealtimeSegmentManager(HelixAdmin helixAdmin, String clusterName, HelixManager helixManager,
- ZkHelixPropertyStore propertyStore, PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
- ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
- _helixAdmin = helixAdmin;
- _helixManager = helixManager;
- _propertyStore = propertyStore;
- _helixResourceManager = helixResourceManager;
- _clusterName = clusterName;
- _controllerConf = controllerConf;
- _controllerMetrics = controllerMetrics;
- _numIdealStateUpdateLocks = controllerConf.getRealtimeSegmentMetadataCommitNumLocks();
- _idealstateUpdateLocks = new Lock[_numIdealStateUpdateLocks];
- for (int i = 0; i < _numIdealStateUpdateLocks; i++) {
- _idealstateUpdateLocks[i] = new ReentrantLock();
- }
- _tableConfigCache = new TableConfigCache(_propertyStore);
- _streamPartitionAssignmentGenerator = new StreamPartitionAssignmentGenerator(_helixManager);
- _flushThresholdUpdateManager = new FlushThresholdUpdateManager();
- _controllerLeadershipManager = controllerLeadershipManager;
- }
-
- public static PinotLLCRealtimeSegmentManager getInstance() {
- if (INSTANCE == null) {
- throw new RuntimeException("Not yet created");
- }
- return INSTANCE;
}
protected boolean isLeader() {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 618c4a6..cfdd3c3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -64,9 +64,6 @@ public class SegmentCompletionManager {
ABORTED, // state machine is aborted. we will start a fresh one when the next segmentConsumed comes in.
}
- // TODO: fix the misuse of singleton.
- private static SegmentCompletionManager _instance = null;
-
private final HelixManager _helixManager;
// A map that holds the FSM for each segment.
private final Map<String, SegmentCompletionFSM> _fsmMap = new ConcurrentHashMap<>();
@@ -84,12 +81,15 @@ public class SegmentCompletionManager {
// TODO keep some history of past committed segments so that we can avoid looking up PROPERTYSTORE if some server comes in late.
- protected SegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
- ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
+ public SegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
+ ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager,
+ int segmentCommitTimeoutSeconds) {
_helixManager = helixManager;
_segmentManager = segmentManager;
_controllerMetrics = controllerMetrics;
_controllerLeadershipManager = controllerLeadershipManager;
+ SegmentCompletionProtocol
+ .setMaxSegmentCommitTimeMs(TimeUnit.MILLISECONDS.convert(segmentCommitTimeoutSeconds, TimeUnit.SECONDS));
}
public boolean isSplitCommitEnabled() {
@@ -100,25 +100,6 @@ public class SegmentCompletionManager {
return _segmentManager.getControllerVipUrl();
}
- public static SegmentCompletionManager create(HelixManager helixManager,
- PinotLLCRealtimeSegmentManager segmentManager, ControllerConf controllerConf,
- ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
- if (_instance != null) {
- throw new RuntimeException("Cannot create multiple instances");
- }
- _instance = new SegmentCompletionManager(helixManager, segmentManager, controllerMetrics, controllerLeadershipManager);
- SegmentCompletionProtocol.setMaxSegmentCommitTimeMs(
- TimeUnit.MILLISECONDS.convert(controllerConf.getSegmentCommitTimeoutSeconds(), TimeUnit.SECONDS));
- return _instance;
- }
-
- public static SegmentCompletionManager getInstance() {
- if (_instance == null) {
- throw new RuntimeException("Not yet created");
- }
- return _instance;
- }
-
protected long getCurrentTimeMs() {
return System.currentTimeMillis();
}
@@ -1118,10 +1099,6 @@ public class SegmentCompletionManager {
}
}
- public static void stop() {
- _instance = null;
- }
-
@VisibleForTesting
protected boolean isLeader() {
return _controllerLeadershipManager.isLeader();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSegmentStrategyFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSegmentStrategyFactory.java
index 30ac8af..7fe6ae8 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSegmentStrategyFactory.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSegmentStrategyFactory.java
@@ -24,35 +24,18 @@ import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategy
/**
- * Singleton factory class, to fetch the right rebalance segments strategy based on table config
+ * This class is used to fetch the right rebalance segments strategy based on table config.
*/
public class RebalanceSegmentStrategyFactory {
- // TODO: fix the misuse of singleton.
- private static RebalanceSegmentStrategyFactory INSTANCE = null;
-
private HelixManager _helixManager;
- private RebalanceSegmentStrategyFactory(HelixManager helixManager) {
+ public RebalanceSegmentStrategyFactory(HelixManager helixManager) {
_helixManager = helixManager;
}
- public static void createInstance(HelixManager helixManager) {
- if (INSTANCE != null) {
- throw new RuntimeException("Instance already created for " + RebalanceSegmentStrategyFactory.class.getName());
- }
- INSTANCE = new RebalanceSegmentStrategyFactory(helixManager);
- }
-
- public static RebalanceSegmentStrategyFactory getInstance() {
- if (INSTANCE == null) {
- throw new RuntimeException("Instance not yet created for " + RebalanceSegmentStrategyFactory.class.getName());
- }
- return INSTANCE;
- }
-
public RebalanceSegmentStrategy getRebalanceSegmentsStrategy(TableConfig tableConfig) {
- // If we use replica group segment assignment strategy, we pick the replica group rebalancer
+ // If we use replica group segment assignment strategy, we pick the replica group rebalancer.
String segmentAssignmentStrategy = tableConfig.getValidationConfig().getSegmentAssignmentStrategy();
if (segmentAssignmentStrategy == null) {
return new DefaultRebalanceSegmentStrategy(_helixManager);
@@ -64,8 +47,4 @@ public class RebalanceSegmentStrategyFactory {
return new DefaultRebalanceSegmentStrategy(_helixManager);
}
}
-
- public static void stop() {
- INSTANCE = null;
- }
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
index 9bf70bd..4de3034 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
@@ -103,6 +103,17 @@ public class PinotControllerModeTest extends ControllerTest {
"Failed to start " + config.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms.");
Assert.assertEquals(_controllerStarter.getControllerMode(), ControllerConf.ControllerMode.PINOT_ONLY);
+ // Start a second Pinot only controller.
+ config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
+ ControllerStarter secondControllerStarter = new TestOnlyControllerStarter(config);
+
+ secondControllerStarter.start();
+ // Two controller instances assigned to cluster.
+ TestUtils.waitForCondition(aVoid -> _helixResourceManager.getAllInstances().size() == 2, TIMEOUT_IN_MS,
+ "Failed to start the 2nd pinot only controller in " + TIMEOUT_IN_MS + "ms.");
+
+ secondControllerStarter.stop();
+
stopController();
_controllerStarter = null;
helixControllerStarter.stop();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 4f881ff..fd860c3 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -58,6 +58,7 @@ import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.ControllerLeadershipManager;
import org.apache.pinot.controller.api.resources.LLCSegmentCompletionHandlers;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.controller.util.SegmentCompletionUtils;
@@ -89,6 +90,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
private static File baseDir;
private Random random;
+ private PinotHelixResourceManager _mockPinotHelixResourceManager;
+
private enum ExternalChange {
N_INSTANCES_CHANGED, N_PARTITIONS_INCREASED, N_INSTANCES_CHANGED_AND_PARTITIONS_INCREASED
}
@@ -119,6 +122,10 @@ public class PinotLLCRealtimeSegmentManagerTest {
}
FakePinotLLCRealtimeSegmentManager.IS_CONNECTED = true;
FakePinotLLCRealtimeSegmentManager.IS_LEADER = true;
+
+ _mockPinotHelixResourceManager = mock(PinotHelixResourceManager.class);
+ HelixManager mockHelixManager = mock(HelixManager.class);
+ when(_mockPinotHelixResourceManager.getHelixZkManager()).thenReturn(mockHelixManager);
}
@AfterTest
@@ -193,7 +200,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
private void testSetupNewTable(TableConfig tableConfig, IdealState idealState, int nPartitions, int nReplicas,
List<String> instances, boolean invalidConfig, boolean badStream) {
- FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(null);
+ FakePinotLLCRealtimeSegmentManager segmentManager =
+ new FakePinotLLCRealtimeSegmentManager(_mockPinotHelixResourceManager, null);
segmentManager._partitionAssignmentGenerator.setConsumingInstances(instances);
segmentManager.addTableToStore(tableConfig.getTableName(), tableConfig, nPartitions);
@@ -281,7 +289,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
@Test
public void testValidateLLCPartitionIncrease() {
- FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(null);
+ FakePinotLLCRealtimeSegmentManager segmentManager =
+ new FakePinotLLCRealtimeSegmentManager(_mockPinotHelixResourceManager, null);
String tableName = "validateThisTable_REALTIME";
int nReplicas = 2;
IdealStateBuilderUtil idealStateBuilder = new IdealStateBuilderUtil(tableName);
@@ -470,7 +479,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
public void testValidateLLCRepair()
throws InvalidConfigException {
- FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(null);
+ FakePinotLLCRealtimeSegmentManager segmentManager =
+ new FakePinotLLCRealtimeSegmentManager(_mockPinotHelixResourceManager, null);
String tableName = "repairThisTable_REALTIME";
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
int nReplicas = 2;
@@ -855,7 +865,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
LLCSegmentName existingSegmentName = new LLCSegmentName("someTable", 1, 31, 12355L);
String[] existingSegs = {existingSegmentName.getSegmentName()};
FakePinotLLCRealtimeSegmentManager segmentManager =
- new FakePinotLLCRealtimeSegmentManager(Arrays.asList(existingSegs));
+ new FakePinotLLCRealtimeSegmentManager(_mockPinotHelixResourceManager, Arrays.asList(existingSegs));
final String rtTableName = "testPreExistingLLCSegments_REALTIME";
@@ -874,7 +884,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
@Test
public void testCommittingSegmentIfDisconnected()
throws InvalidConfigException {
- FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(null);
+ FakePinotLLCRealtimeSegmentManager segmentManager =
+ new FakePinotLLCRealtimeSegmentManager(_mockPinotHelixResourceManager, null);
final String tableName = "table_REALTIME";
final String rawTableName = TableNameBuilder.extractRawTableName(tableName);
@@ -926,7 +937,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
@Test
public void testCommittingSegment()
throws InvalidConfigException {
- FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(null);
+ FakePinotLLCRealtimeSegmentManager segmentManager =
+ new FakePinotLLCRealtimeSegmentManager(_mockPinotHelixResourceManager, null);
final String rtTableName = "table_REALTIME";
final String rawTableName = TableNameBuilder.extractRawTableName(rtTableName);
@@ -1083,11 +1095,14 @@ public class PinotLLCRealtimeSegmentManagerTest {
public void testCommitSegmentWhenControllerWentThroughGC()
throws InvalidConfigException {
- FakePinotLLCRealtimeSegmentManager segmentManager1 = new FakePinotLLCRealtimeSegmentManager(null);
- FakePinotLLCRealtimeSegmentManager segmentManager2 = new FakePinotLLCRealtimeSegmentManagerII(null,
- FakePinotLLCRealtimeSegmentManagerII.SCENARIO_1_ZK_VERSION_NUM_HAS_CHANGE);
- FakePinotLLCRealtimeSegmentManager segmentManager3 = new FakePinotLLCRealtimeSegmentManagerII(null,
- FakePinotLLCRealtimeSegmentManagerII.SCENARIO_2_METADATA_STATUS_HAS_CHANGE);
+ FakePinotLLCRealtimeSegmentManager segmentManager1 =
+ new FakePinotLLCRealtimeSegmentManager(_mockPinotHelixResourceManager, null);
+ FakePinotLLCRealtimeSegmentManager segmentManager2 =
+ new FakePinotLLCRealtimeSegmentManagerII(_mockPinotHelixResourceManager, null,
+ FakePinotLLCRealtimeSegmentManagerII.SCENARIO_1_ZK_VERSION_NUM_HAS_CHANGE);
+ FakePinotLLCRealtimeSegmentManager segmentManager3 =
+ new FakePinotLLCRealtimeSegmentManagerII(_mockPinotHelixResourceManager, null,
+ FakePinotLLCRealtimeSegmentManagerII.SCENARIO_2_METADATA_STATUS_HAS_CHANGE);
final String rtTableName = "table_REALTIME";
final String rawTableName = TableNameBuilder.extractRawTableName(rtTableName);
@@ -1123,7 +1138,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
@Test
public void testIdealStateAlreadyUpdated()
throws InvalidConfigException {
- FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(null);
+ FakePinotLLCRealtimeSegmentManager segmentManager =
+ new FakePinotLLCRealtimeSegmentManager(_mockPinotHelixResourceManager, null);
String tableNameWithType = "tableName_REALTIME";
String rawTableName = "tableName";
int nPartitions = 4;
@@ -1199,7 +1215,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
throws Exception {
PinotFSFactory.init(new PropertiesConfiguration());
PinotLLCRealtimeSegmentManager realtimeSegmentManager =
- new FakePinotLLCRealtimeSegmentManager(Collections.<String>emptyList());
+ new FakePinotLLCRealtimeSegmentManager(_mockPinotHelixResourceManager, Collections.<String>emptyList());
String tableName = "fakeTable_REALTIME";
String segmentName = "segment";
String temporarySegmentLocation = SegmentCompletionUtils.generateSegmentFileName(segmentName);
@@ -1219,7 +1235,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
throws Exception {
PinotFSFactory.init(new PropertiesConfiguration());
PinotLLCRealtimeSegmentManager realtimeSegmentManager =
- new FakePinotLLCRealtimeSegmentManager(Collections.<String>emptyList());
+ new FakePinotLLCRealtimeSegmentManager(_mockPinotHelixResourceManager, Collections.<String>emptyList());
String tableName = "fakeTable_REALTIME";
String segmentName = "segment";
@@ -1341,9 +1357,11 @@ public class PinotLLCRealtimeSegmentManagerTest {
private TableConfigStore _tableConfigStore;
- protected FakePinotLLCRealtimeSegmentManager(List<String> existingLLCSegments, HelixManager helixManager) {
- super(null, clusterName, helixManager, null, null, CONTROLLER_CONF, new ControllerMetrics(new MetricsRegistry()),
- new ControllerLeadershipManager(helixManager));
+ protected FakePinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager,
+ List<String> existingLLCSegments) {
+ super(pinotHelixResourceManager, CONTROLLER_CONF, new ControllerMetrics(new MetricsRegistry()),
+ new ControllerLeadershipManager(pinotHelixResourceManager.getHelixZkManager()));
+
try {
TableConfigCache mockCache = mock(TableConfigCache.class);
TableConfig mockTableConfig = mock(TableConfig.class);
@@ -1378,10 +1396,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
_tableConfigStore = new TableConfigStore();
}
- protected FakePinotLLCRealtimeSegmentManager(List<String> existingLLCSegments) {
- this(existingLLCSegments, mock(HelixManager.class));
- }
-
private SegmentMetadataImpl newMockSegmentMetadata() {
segmentMetadata = mock(SegmentMetadataImpl.class);
when(segmentMetadata.getCrc()).thenReturn(FakePinotLLCRealtimeSegmentManager.CRC);
@@ -1574,8 +1588,9 @@ public class PinotLLCRealtimeSegmentManagerTest {
private int _scenario;
- FakePinotLLCRealtimeSegmentManagerII(List<String> existingLLCSegments, int scenario) {
- super(existingLLCSegments);
+ FakePinotLLCRealtimeSegmentManagerII(PinotHelixResourceManager pinotHelixResourceManager,
+ List<String> existingLLCSegments, int scenario) {
+ super(pinotHelixResourceManager, existingLLCSegments);
_scenario = scenario;
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index 0dfe6b4..e13fc87 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -31,6 +31,7 @@ import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.zookeeper.data.Stat;
import org.testng.Assert;
@@ -39,6 +40,7 @@ import org.testng.annotations.Test;
import static org.apache.pinot.common.protocols.SegmentCompletionProtocol.ControllerResponseStatus;
import static org.apache.pinot.common.protocols.SegmentCompletionProtocol.Request;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -65,8 +67,10 @@ public class SegmentCompletionTest {
public void testCaseSetup(boolean isLeader, boolean isConnected)
throws Exception {
- segmentManager = new MockPinotLLCRealtimeSegmentManager(isLeader, isConnected);
- ControllerLeadershipManager controllerLeadershipManager = segmentManager.getControllerLeadershipManager();
+ PinotHelixResourceManager mockPinotHelixResourceManager = mock(PinotHelixResourceManager.class);
+ HelixManager mockHelixManager = createMockHelixManager(isLeader, isConnected);
+ when(mockPinotHelixResourceManager.getHelixZkManager()).thenReturn(mockHelixManager);
+ segmentManager = new MockPinotLLCRealtimeSegmentManager(mockPinotHelixResourceManager);
final int partitionId = 23;
final int seqId = 12;
final long now = System.currentTimeMillis();
@@ -78,8 +82,7 @@ public class SegmentCompletionTest {
metadata.setNumReplicas(3);
segmentManager._segmentMetadata = metadata;
- segmentCompletionMgr =
- new MockSegmentCompletionManager(segmentManager, isLeader, isConnected);
+ segmentCompletionMgr = new MockSegmentCompletionManager(segmentManager, isLeader, isConnected);
segmentManager._segmentCompletionMgr = segmentCompletionMgr;
Field fsmMapField = SegmentCompletionManager.class.getDeclaredField("_fsmMap");
@@ -1142,9 +1145,9 @@ public class SegmentCompletionTest {
public String _stoppedInstance;
public HelixManager _helixManager = mock(HelixManager.class);
- protected MockPinotLLCRealtimeSegmentManager(boolean isLeader, boolean isConnected) {
- super(null, clusterName, null, null, null, CONTROLLER_CONF, new ControllerMetrics(new MetricsRegistry()),
- new ControllerLeadershipManager(createMockHelixManager(isLeader, isConnected)));
+ protected MockPinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager) {
+ super(pinotHelixResourceManager, CONTROLLER_CONF, new ControllerMetrics(new MetricsRegistry()),
+ new ControllerLeadershipManager(pinotHelixResourceManager.getHelixZkManager()));
}
@Override
@@ -1191,18 +1194,18 @@ public class SegmentCompletionTest {
protected MockSegmentCompletionManager(PinotLLCRealtimeSegmentManager segmentManager, boolean isLeader,
boolean isConnected) {
- this(createMockHelixManager(isLeader, isConnected), segmentManager, isLeader, isConnected);
+ this(createMockHelixManager(isLeader, isConnected), segmentManager, isLeader);
}
- protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager, boolean isLeader,
- boolean isConnected) {
- this(helixManager, segmentManager, isLeader, isConnected, new ControllerLeadershipManager(helixManager));
+ protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
+ boolean isLeader) {
+ this(helixManager, segmentManager, isLeader, new ControllerLeadershipManager(helixManager));
}
protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
- boolean isLeader, boolean isConnected, ControllerLeadershipManager controllerLeadershipManager) {
- super(helixManager, segmentManager, new ControllerMetrics(new MetricsRegistry()),
- controllerLeadershipManager);
+ boolean isLeader, ControllerLeadershipManager controllerLeadershipManager) {
+ super(helixManager, segmentManager, new ControllerMetrics(new MetricsRegistry()), controllerLeadershipManager,
+ SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds());
_isLeader = isLeader;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org