You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/06/11 05:30:46 UTC

[incubator-pinot] branch add-logic-for-lead-controller-resource created (now 5f0e00d)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a change to branch add-logic-for-lead-controller-resource
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 5f0e00d  Add logic for leveraging lead controller resource

This branch includes the following new commits:

     new 5f0e00d  Add logic for leveraging lead controller resource

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Add logic for leveraging lead controller resource

Posted by jl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch add-logic-for-lead-controller-resource
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 5f0e00d9975a9d525157654a1219b675d85abaef
Author: jackjlli <jl...@linkedin.com>
AuthorDate: Mon Jun 10 22:30:24 2019 -0700

    Add logic for leveraging lead controller resource
---
 .../realtime/LLRealtimeSegmentDataManager.java     |  4 +-
 .../server/realtime/ControllerLeaderLocator.java   | 98 ++++++++++++++++++----
 .../ServerSegmentCompletionProtocolHandler.java    |  7 +-
 3 files changed, 90 insertions(+), 19 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 59d2bb8..2e2aced 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1028,6 +1028,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
     _segmentZKMetadata = (LLCRealtimeSegmentZKMetadata) segmentZKMetadata;
     _tableConfig = tableConfig;
+    _tableNameWithType = _tableConfig.getTableName();
     _realtimeTableDataManager = realtimeTableDataManager;
     _resourceDataDir = resourceDataDir;
     _indexLoadingConfig = indexLoadingConfig;
@@ -1036,7 +1037,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _segmentVersion = indexLoadingConfig.getSegmentVersion();
     _instanceId = _realtimeTableDataManager.getServerInstance();
     _leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_instanceId);
-    _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics);
+    _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics, _tableNameWithType);
 
     // TODO Validate configs
     IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
@@ -1046,7 +1047,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _segmentNameStr = _segmentZKMetadata.getSegmentName();
     _segmentName = new LLCSegmentName(_segmentNameStr);
     _streamPartitionId = _segmentName.getPartitionId();
-    _tableNameWithType = _tableConfig.getTableName();
     _timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
     _metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _streamPartitionId;
     segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr);
diff --git a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
index 8a3b0fb..4352ee0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
@@ -19,15 +19,19 @@
 package org.apache.pinot.server.realtime;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ExternalView;
 import org.apache.pinot.core.query.utils.Pair;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME;
+
 // Helix keeps the old controller around for 30s before electing a new one, so we will keep getting
 // the old controller as leader, and it will keep returning NOT_LEADER.
 
@@ -78,35 +82,99 @@ public class ControllerLeaderLocator {
   /**
    * Locate the controller leader so that we can send LLC segment completion requests to it.
    * Checks the {@link ControllerLeaderLocator::_cachedControllerLeaderInvalid} flag and fetches the leader from helix if cached value is invalid
-   *
+   * @param rawTableName table name without type.
    * @return The host:port string of the current controller leader.
    */
-  public synchronized Pair<String, Integer> getControllerLeader() {
+  public synchronized Pair<String, Integer> getControllerLeader(String rawTableName) {
     if (!_cachedControllerLeaderInvalid) {
       return _controllerLeaderHostPort;
     }
 
+    String leaderForTable = getLeaderForTable(rawTableName);
+    if (leaderForTable == null) {
+      LOGGER.warn("Failed to find a leader for Table: {}", rawTableName);
+      _cachedControllerLeaderInvalid = true;
+      return null;
+    } else {
+      _controllerLeaderHostPort = generateControllerLeaderHostPortPair(leaderForTable);
+      _cachedControllerLeaderInvalid = false;
+      LOGGER.info("Setting controller leader to be {}:{}", _controllerLeaderHostPort.getFirst(),
+          _controllerLeaderHostPort.getSecond());
+      return _controllerLeaderHostPort;
+    }
+  }
+
+  /**
+   * If partition leader exists, use this as the leader for realtime segment completion.
+   * Otherwise, try to use Helix leader.
+   * @param rawTableName table name without type
+   * @return the leader for this table.
+   */
+  private String getLeaderForTable(String rawTableName) {
+    String leaderForTable;
+    ExternalView leadControllerResourceExternalView = _helixManager.getClusterManagmentTool().getResourceExternalView(_clusterName, LEAD_CONTROLLER_RESOURCE_NAME);
+    String partitionLeader = getPartitionLeader(leadControllerResourceExternalView, rawTableName);
+    if (partitionLeader != null) {
+      leaderForTable = partitionLeader;
+    } else {
+      // Get Helix leader to be the leader to this table.
+      String helixLeader = getHelixClusterLeader();
+      if (helixLeader != null) {
+        leaderForTable = helixLeader;
+      } else {
+        leaderForTable = null;
+      }
+    }
+    return leaderForTable;
+  }
+
+  /**
+   * Gets partition leader from lead controller resource.
+   * If the resource is disabled or no controller registered as participant, there is no instance in "MASTER" state.
+   *
+   * @param leadControllerResourceExternalView external view of lead controller resource
+   * @param rawTableName table name without type
+   * @return leader of partition, null if not found.
+   */
+  private String getPartitionLeader(ExternalView leadControllerResourceExternalView, String rawTableName) {
+    int numPartitions = leadControllerResourceExternalView.getPartitionSet().size();
+    int partitionIndex = rawTableName.hashCode() % numPartitions;
+    String partitionName = LEAD_CONTROLLER_RESOURCE_NAME + "_" + partitionIndex;
+    Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partitionName);
+
+    for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+      if ("MASTER".equals(entry.getValue())) {
+        return entry.getKey();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Gets Helix leader in the cluster. Null if there is no leader.
+   * @return Helix leader.
+   */
+  private String getHelixClusterLeader() {
     BaseDataAccessor<ZNRecord> dataAccessor = _helixManager.getHelixDataAccessor().getBaseDataAccessor();
     Stat stat = new Stat();
     try {
-      ZNRecord znRecord =
-          dataAccessor.get("/" + _clusterName + "/CONTROLLER/LEADER", stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
-      String leader = znRecord.getId();
-      int index = leader.lastIndexOf('_');
-      String leaderHost = leader.substring(0, index);
-      int leaderPort = Integer.valueOf(leader.substring(index + 1));
-      _controllerLeaderHostPort = new Pair<>(leaderHost, leaderPort);
-      _cachedControllerLeaderInvalid = false;
-      LOGGER.info("Setting controller leader to be {}:{} as per znode version {}, mtime {}", leaderHost, leaderPort,
-          stat.getVersion(), stat.getMtime());
-      return _controllerLeaderHostPort;
+      ZNRecord znRecord = dataAccessor.get("/" + _clusterName + "/CONTROLLER/LEADER", stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
+      String helixLeader = znRecord.getId();
+      LOGGER.info("Getting Helix leader: {} as per znode version {}, mtime {}", helixLeader, stat.getVersion(), stat.getMtime());
+      return helixLeader;
     } catch (Exception e) {
-      LOGGER.warn("Could not locate controller leader, exception", e);
-      _cachedControllerLeaderInvalid = true;
+      LOGGER.warn("Could not locate Helix leader", e);
       return null;
     }
   }
 
+  private Pair<String, Integer> generateControllerLeaderHostPortPair(String controllerLeaderId) {
+    int index = controllerLeaderId.lastIndexOf('_');
+    String leaderHost = controllerLeaderId.substring(0, index);
+    int leaderPort = Integer.valueOf(controllerLeaderId.substring(index + 1));
+    return new Pair<>(leaderHost, leaderPort);
+  }
+
   /**
    * Invalidates the cached controller leader value by setting the {@link ControllerLeaderLocator::_cacheControllerLeadeInvalid} flag.
    * This flag is always checked first by {@link ControllerLeaderLocator::getControllerLeader()} method before returning the leader. If set, leader is fetched from helix, else cached leader value is returned.
diff --git a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
index bcfa7ee..5c75f1e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
@@ -23,6 +23,7 @@ import java.net.URI;
 import java.util.Map;
 import javax.net.ssl.SSLContext;
 import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
@@ -51,6 +52,7 @@ public class ServerSegmentCompletionProtocolHandler {
 
   private final FileUploadDownloadClient _fileUploadDownloadClient;
   private final ServerMetrics _serverMetrics;
+  private final String _rawTableName;
 
   public static void init(Configuration uploaderConfig) {
     Configuration httpsConfig = uploaderConfig.subset(HTTPS_PROTOCOL);
@@ -62,9 +64,10 @@ public class ServerSegmentCompletionProtocolHandler {
         uploaderConfig.getInt(CONFIG_OF_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS, DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS);
   }
 
-  public ServerSegmentCompletionProtocolHandler(ServerMetrics serverMetrics) {
+  public ServerSegmentCompletionProtocolHandler(ServerMetrics serverMetrics, String tableNameWithType) {
     _fileUploadDownloadClient = new FileUploadDownloadClient(_sslContext);
     _serverMetrics = serverMetrics;
+    _rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
   }
 
   public SegmentCompletionProtocol.Response segmentCommitStart(SegmentCompletionProtocol.Request.Params params) {
@@ -159,7 +162,7 @@ public class ServerSegmentCompletionProtocolHandler {
 
   private String createSegmentCompletionUrl(SegmentCompletionProtocol.Request request) {
     ControllerLeaderLocator leaderLocator = ControllerLeaderLocator.getInstance();
-    final Pair<String, Integer> leaderHostPort = leaderLocator.getControllerLeader();
+    final Pair<String, Integer> leaderHostPort = leaderLocator.getControllerLeader(_rawTableName);
     if (leaderHostPort == null) {
       LOGGER.warn("No leader found while trying to send {}", request.toString());
       return null;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org