You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/06/11 05:30:47 UTC

[incubator-pinot] 01/01: Add logic for leveraging lead controller resource

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch add-logic-for-lead-controller-resource
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 5f0e00d9975a9d525157654a1219b675d85abaef
Author: jackjlli <jl...@linkedin.com>
AuthorDate: Mon Jun 10 22:30:24 2019 -0700

    Add logic for leveraging lead controller resource
---
 .../realtime/LLRealtimeSegmentDataManager.java     |  4 +-
 .../server/realtime/ControllerLeaderLocator.java   | 98 ++++++++++++++++++----
 .../ServerSegmentCompletionProtocolHandler.java    |  7 +-
 3 files changed, 90 insertions(+), 19 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 59d2bb8..2e2aced 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1028,6 +1028,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
     _segmentZKMetadata = (LLCRealtimeSegmentZKMetadata) segmentZKMetadata;
     _tableConfig = tableConfig;
+    _tableNameWithType = _tableConfig.getTableName();
     _realtimeTableDataManager = realtimeTableDataManager;
     _resourceDataDir = resourceDataDir;
     _indexLoadingConfig = indexLoadingConfig;
@@ -1036,7 +1037,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _segmentVersion = indexLoadingConfig.getSegmentVersion();
     _instanceId = _realtimeTableDataManager.getServerInstance();
     _leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_instanceId);
-    _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics);
+    _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics, _tableNameWithType);
 
     // TODO Validate configs
     IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
@@ -1046,7 +1047,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _segmentNameStr = _segmentZKMetadata.getSegmentName();
     _segmentName = new LLCSegmentName(_segmentNameStr);
     _streamPartitionId = _segmentName.getPartitionId();
-    _tableNameWithType = _tableConfig.getTableName();
     _timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
     _metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _streamPartitionId;
     segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr);
diff --git a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
index 8a3b0fb..4352ee0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
@@ -19,15 +19,19 @@
 package org.apache.pinot.server.realtime;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ExternalView;
 import org.apache.pinot.core.query.utils.Pair;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME;
+
 // Helix keeps the old controller around for 30s before electing a new one, so we will keep getting
 // the old controller as leader, and it will keep returning NOT_LEADER.
 
@@ -78,35 +82,99 @@ public class ControllerLeaderLocator {
   /**
    * Locate the controller leader so that we can send LLC segment completion requests to it.
    * Checks the {@link ControllerLeaderLocator::_cachedControllerLeaderInvalid} flag and fetches the leader from helix if cached value is invalid
-   *
+   * @param rawTableName table name without type.
    * @return The host:port string of the current controller leader.
    */
-  public synchronized Pair<String, Integer> getControllerLeader() {
+  public synchronized Pair<String, Integer> getControllerLeader(String rawTableName) {
     if (!_cachedControllerLeaderInvalid) {
       return _controllerLeaderHostPort;
     }
 
+    String leaderForTable = getLeaderForTable(rawTableName);
+    if (leaderForTable == null) {
+      LOGGER.warn("Failed to find a leader for Table: {}", rawTableName);
+      _cachedControllerLeaderInvalid = true;
+      return null;
+    } else {
+      _controllerLeaderHostPort = generateControllerLeaderHostPortPair(leaderForTable);
+      _cachedControllerLeaderInvalid = false;
+      LOGGER.info("Setting controller leader to be {}:{}", _controllerLeaderHostPort.getFirst(),
+          _controllerLeaderHostPort.getSecond());
+      return _controllerLeaderHostPort;
+    }
+  }
+
+  /**
+   * If partition leader exists, use this as the leader for realtime segment completion.
+   * Otherwise, try to use Helix leader.
+   * @param rawTableName table name without type
+   * @return the leader for this table.
+   */
+  private String getLeaderForTable(String rawTableName) {
+    String leaderForTable;
+    ExternalView leadControllerResourceExternalView = _helixManager.getClusterManagmentTool().getResourceExternalView(_clusterName, LEAD_CONTROLLER_RESOURCE_NAME);
+    String partitionLeader = getPartitionLeader(leadControllerResourceExternalView, rawTableName);
+    if (partitionLeader != null) {
+      leaderForTable = partitionLeader;
+    } else {
+      // Get Helix leader to be the leader to this table.
+      String helixLeader = getHelixClusterLeader();
+      if (helixLeader != null) {
+        leaderForTable = helixLeader;
+      } else {
+        leaderForTable = null;
+      }
+    }
+    return leaderForTable;
+  }
+
+  /**
+   * Gets partition leader from lead controller resource.
+   * If the resource is disabled or no controller registered as participant, there is no instance in "MASTER" state.
+   *
+   * @param leadControllerResourceExternalView external view of lead controller resource
+   * @param rawTableName table name without type
+   * @return leader of partition, null if not found.
+   */
+  private String getPartitionLeader(ExternalView leadControllerResourceExternalView, String rawTableName) {
+    int numPartitions = leadControllerResourceExternalView.getPartitionSet().size();
+    int partitionIndex = rawTableName.hashCode() % numPartitions;
+    String partitionName = LEAD_CONTROLLER_RESOURCE_NAME + "_" + partitionIndex;
+    Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partitionName);
+
+    for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+      if ("MASTER".equals(entry.getValue())) {
+        return entry.getKey();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Gets Helix leader in the cluster. Null if there is no leader.
+   * @return Helix leader.
+   */
+  private String getHelixClusterLeader() {
     BaseDataAccessor<ZNRecord> dataAccessor = _helixManager.getHelixDataAccessor().getBaseDataAccessor();
     Stat stat = new Stat();
     try {
-      ZNRecord znRecord =
-          dataAccessor.get("/" + _clusterName + "/CONTROLLER/LEADER", stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
-      String leader = znRecord.getId();
-      int index = leader.lastIndexOf('_');
-      String leaderHost = leader.substring(0, index);
-      int leaderPort = Integer.valueOf(leader.substring(index + 1));
-      _controllerLeaderHostPort = new Pair<>(leaderHost, leaderPort);
-      _cachedControllerLeaderInvalid = false;
-      LOGGER.info("Setting controller leader to be {}:{} as per znode version {}, mtime {}", leaderHost, leaderPort,
-          stat.getVersion(), stat.getMtime());
-      return _controllerLeaderHostPort;
+      ZNRecord znRecord = dataAccessor.get("/" + _clusterName + "/CONTROLLER/LEADER", stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
+      String helixLeader = znRecord.getId();
+      LOGGER.info("Getting Helix leader: {} as per znode version {}, mtime {}", helixLeader, stat.getVersion(), stat.getMtime());
+      return helixLeader;
     } catch (Exception e) {
-      LOGGER.warn("Could not locate controller leader, exception", e);
-      _cachedControllerLeaderInvalid = true;
+      LOGGER.warn("Could not locate Helix leader", e);
       return null;
     }
   }
 
+  private Pair<String, Integer> generateControllerLeaderHostPortPair(String controllerLeaderId) {
+    int index = controllerLeaderId.lastIndexOf('_');
+    String leaderHost = controllerLeaderId.substring(0, index);
+    int leaderPort = Integer.valueOf(controllerLeaderId.substring(index + 1));
+    return new Pair<>(leaderHost, leaderPort);
+  }
+
   /**
    * Invalidates the cached controller leader value by setting the {@link ControllerLeaderLocator::_cacheControllerLeadeInvalid} flag.
    * This flag is always checked first by {@link ControllerLeaderLocator::getControllerLeader()} method before returning the leader. If set, leader is fetched from helix, else cached leader value is returned.
diff --git a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
index bcfa7ee..5c75f1e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
@@ -23,6 +23,7 @@ import java.net.URI;
 import java.util.Map;
 import javax.net.ssl.SSLContext;
 import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
@@ -51,6 +52,7 @@ public class ServerSegmentCompletionProtocolHandler {
 
   private final FileUploadDownloadClient _fileUploadDownloadClient;
   private final ServerMetrics _serverMetrics;
+  private final String _rawTableName;
 
   public static void init(Configuration uploaderConfig) {
     Configuration httpsConfig = uploaderConfig.subset(HTTPS_PROTOCOL);
@@ -62,9 +64,10 @@ public class ServerSegmentCompletionProtocolHandler {
         uploaderConfig.getInt(CONFIG_OF_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS, DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS);
   }
 
-  public ServerSegmentCompletionProtocolHandler(ServerMetrics serverMetrics) {
+  public ServerSegmentCompletionProtocolHandler(ServerMetrics serverMetrics, String tableNameWithType) {
     _fileUploadDownloadClient = new FileUploadDownloadClient(_sslContext);
     _serverMetrics = serverMetrics;
+    _rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
   }
 
   public SegmentCompletionProtocol.Response segmentCommitStart(SegmentCompletionProtocol.Request.Params params) {
@@ -159,7 +162,7 @@ public class ServerSegmentCompletionProtocolHandler {
 
   private String createSegmentCompletionUrl(SegmentCompletionProtocol.Request request) {
     ControllerLeaderLocator leaderLocator = ControllerLeaderLocator.getInstance();
-    final Pair<String, Integer> leaderHostPort = leaderLocator.getControllerLeader();
+    final Pair<String, Integer> leaderHostPort = leaderLocator.getControllerLeader(_rawTableName);
     if (leaderHostPort == null) {
       LOGGER.warn("No leader found while trying to send {}", request.toString());
       return null;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org