You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/06/11 05:30:47 UTC
[incubator-pinot] 01/01: Add logic for leveraging lead controller
resource
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch add-logic-for-lead-controller-resource
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 5f0e00d9975a9d525157654a1219b675d85abaef
Author: jackjlli <jl...@linkedin.com>
AuthorDate: Mon Jun 10 22:30:24 2019 -0700
Add logic for leveraging lead controller resource
---
.../realtime/LLRealtimeSegmentDataManager.java | 4 +-
.../server/realtime/ControllerLeaderLocator.java | 98 ++++++++++++++++++----
.../ServerSegmentCompletionProtocolHandler.java | 7 +-
3 files changed, 90 insertions(+), 19 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 59d2bb8..2e2aced 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1028,6 +1028,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
_segmentZKMetadata = (LLCRealtimeSegmentZKMetadata) segmentZKMetadata;
_tableConfig = tableConfig;
+ _tableNameWithType = _tableConfig.getTableName();
_realtimeTableDataManager = realtimeTableDataManager;
_resourceDataDir = resourceDataDir;
_indexLoadingConfig = indexLoadingConfig;
@@ -1036,7 +1037,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_segmentVersion = indexLoadingConfig.getSegmentVersion();
_instanceId = _realtimeTableDataManager.getServerInstance();
_leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_instanceId);
- _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics);
+ _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics, _tableNameWithType);
// TODO Validate configs
IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
@@ -1046,7 +1047,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_segmentNameStr = _segmentZKMetadata.getSegmentName();
_segmentName = new LLCSegmentName(_segmentNameStr);
_streamPartitionId = _segmentName.getPartitionId();
- _tableNameWithType = _tableConfig.getTableName();
_timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
_metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _streamPartitionId;
segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr);
diff --git a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
index 8a3b0fb..4352ee0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
@@ -19,15 +19,19 @@
package org.apache.pinot.server.realtime;
import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ExternalView;
import org.apache.pinot.core.query.utils.Pair;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME;
+
// Helix keeps the old controller around for 30s before electing a new one, so we will keep getting
// the old controller as leader, and it will keep returning NOT_LEADER.
@@ -78,35 +82,99 @@ public class ControllerLeaderLocator {
/**
* Locate the controller leader so that we can send LLC segment completion requests to it.
* Checks the {@link ControllerLeaderLocator::_cachedControllerLeaderInvalid} flag and fetches the leader from helix if cached value is invalid
- *
+ * @param rawTableName table name without type.
* @return The host:port string of the current controller leader.
*/
- public synchronized Pair<String, Integer> getControllerLeader() {
+ public synchronized Pair<String, Integer> getControllerLeader(String rawTableName) {
if (!_cachedControllerLeaderInvalid) {
return _controllerLeaderHostPort;
}
+ String leaderForTable = getLeaderForTable(rawTableName);
+ if (leaderForTable == null) {
+ LOGGER.warn("Failed to find a leader for Table: {}", rawTableName);
+ _cachedControllerLeaderInvalid = true;
+ return null;
+ } else {
+ _controllerLeaderHostPort = generateControllerLeaderHostPortPair(leaderForTable);
+ _cachedControllerLeaderInvalid = false;
+ LOGGER.info("Setting controller leader to be {}:{}", _controllerLeaderHostPort.getFirst(),
+ _controllerLeaderHostPort.getSecond());
+ return _controllerLeaderHostPort;
+ }
+ }
+
+ /**
+ * If partition leader exists, use this as the leader for realtime segment completion.
+ * Otherwise, try to use Helix leader.
+ * @param rawTableName table name without type
+ * @return the leader for this table.
+ */
+ private String getLeaderForTable(String rawTableName) {
+ String leaderForTable;
+ ExternalView leadControllerResourceExternalView = _helixManager.getClusterManagmentTool().getResourceExternalView(_clusterName, LEAD_CONTROLLER_RESOURCE_NAME);
+ String partitionLeader = getPartitionLeader(leadControllerResourceExternalView, rawTableName);
+ if (partitionLeader != null) {
+ leaderForTable = partitionLeader;
+ } else {
+ // Get Helix leader to be the leader to this table.
+ String helixLeader = getHelixClusterLeader();
+ if (helixLeader != null) {
+ leaderForTable = helixLeader;
+ } else {
+ leaderForTable = null;
+ }
+ }
+ return leaderForTable;
+ }
+
+ /**
+ * Gets partition leader from lead controller resource.
+ * If the resource is disabled or no controller registered as participant, there is no instance in "MASTER" state.
+ *
+ * @param leadControllerResourceExternalView external view of lead controller resource
+ * @param rawTableName table name without type
+ * @return leader of partition, null if not found.
+ */
+ private String getPartitionLeader(ExternalView leadControllerResourceExternalView, String rawTableName) {
+ int numPartitions = leadControllerResourceExternalView.getPartitionSet().size();
+ int partitionIndex = rawTableName.hashCode() % numPartitions;
+ String partitionName = LEAD_CONTROLLER_RESOURCE_NAME + "_" + partitionIndex;
+ Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partitionName);
+
+ for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+ if ("MASTER".equals(entry.getValue())) {
+ return entry.getKey();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Gets Helix leader in the cluster. Null if there is no leader.
+ * @return Helix leader.
+ */
+ private String getHelixClusterLeader() {
BaseDataAccessor<ZNRecord> dataAccessor = _helixManager.getHelixDataAccessor().getBaseDataAccessor();
Stat stat = new Stat();
try {
- ZNRecord znRecord =
- dataAccessor.get("/" + _clusterName + "/CONTROLLER/LEADER", stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
- String leader = znRecord.getId();
- int index = leader.lastIndexOf('_');
- String leaderHost = leader.substring(0, index);
- int leaderPort = Integer.valueOf(leader.substring(index + 1));
- _controllerLeaderHostPort = new Pair<>(leaderHost, leaderPort);
- _cachedControllerLeaderInvalid = false;
- LOGGER.info("Setting controller leader to be {}:{} as per znode version {}, mtime {}", leaderHost, leaderPort,
- stat.getVersion(), stat.getMtime());
- return _controllerLeaderHostPort;
+ ZNRecord znRecord = dataAccessor.get("/" + _clusterName + "/CONTROLLER/LEADER", stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
+ String helixLeader = znRecord.getId();
+ LOGGER.info("Getting Helix leader: {} as per znode version {}, mtime {}", helixLeader, stat.getVersion(), stat.getMtime());
+ return helixLeader;
} catch (Exception e) {
- LOGGER.warn("Could not locate controller leader, exception", e);
- _cachedControllerLeaderInvalid = true;
+ LOGGER.warn("Could not locate Helix leader", e);
return null;
}
}
+ private Pair<String, Integer> generateControllerLeaderHostPortPair(String controllerLeaderId) {
+ int index = controllerLeaderId.lastIndexOf('_');
+ String leaderHost = controllerLeaderId.substring(0, index);
+ int leaderPort = Integer.valueOf(controllerLeaderId.substring(index + 1));
+ return new Pair<>(leaderHost, leaderPort);
+ }
+
/**
* Invalidates the cached controller leader value by setting the {@link ControllerLeaderLocator::_cacheControllerLeadeInvalid} flag.
* This flag is always checked first by {@link ControllerLeaderLocator::getControllerLeader()} method before returning the leader. If set, leader is fetched from helix, else cached leader value is returned.
diff --git a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
index bcfa7ee..5c75f1e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
@@ -23,6 +23,7 @@ import java.net.URI;
import java.util.Map;
import javax.net.ssl.SSLContext;
import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
@@ -51,6 +52,7 @@ public class ServerSegmentCompletionProtocolHandler {
private final FileUploadDownloadClient _fileUploadDownloadClient;
private final ServerMetrics _serverMetrics;
+ private final String _rawTableName;
public static void init(Configuration uploaderConfig) {
Configuration httpsConfig = uploaderConfig.subset(HTTPS_PROTOCOL);
@@ -62,9 +64,10 @@ public class ServerSegmentCompletionProtocolHandler {
uploaderConfig.getInt(CONFIG_OF_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS, DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS);
}
- public ServerSegmentCompletionProtocolHandler(ServerMetrics serverMetrics) {
+ public ServerSegmentCompletionProtocolHandler(ServerMetrics serverMetrics, String tableNameWithType) {
_fileUploadDownloadClient = new FileUploadDownloadClient(_sslContext);
_serverMetrics = serverMetrics;
+ _rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
}
public SegmentCompletionProtocol.Response segmentCommitStart(SegmentCompletionProtocol.Request.Params params) {
@@ -159,7 +162,7 @@ public class ServerSegmentCompletionProtocolHandler {
private String createSegmentCompletionUrl(SegmentCompletionProtocol.Request request) {
ControllerLeaderLocator leaderLocator = ControllerLeaderLocator.getInstance();
- final Pair<String, Integer> leaderHostPort = leaderLocator.getControllerLeader();
+ final Pair<String, Integer> leaderHostPort = leaderLocator.getControllerLeader(_rawTableName);
if (leaderHostPort == null) {
LOGGER.warn("No leader found while trying to send {}", request.toString());
return null;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org