You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/06/11 06:24:37 UTC

[incubator-pinot] branch add-logic-for-lead-controller-resource updated (5f0e00d -> d07e0ba)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a change to branch add-logic-for-lead-controller-resource
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


 discard 5f0e00d  Add logic for leveraging lead controller resource
     new d07e0ba  Add logic for leveraging lead controller resource

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (5f0e00d)
            \
             N -- N -- N   refs/heads/add-logic-for-lead-controller-resource (d07e0ba)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../manager/realtime/RealtimeTableDataManager.java    |  2 +-
 .../realtime/SegmentBuildTimeLeaseExtender.java       |  8 ++++----
 .../server/realtime/ControllerLeaderLocator.java      | 10 +++++++++-
 .../realtime/LLRealtimeSegmentDataManagerTest.java    |  2 +-
 .../server/realtime/ControllerLeaderLocatorTest.java  | 19 +++++++++++++++----
 .../tests/SegmentCompletionIntegrationTests.java      |  2 +-
 6 files changed, 31 insertions(+), 12 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Add logic for leveraging lead controller resource

Posted by jl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch add-logic-for-lead-controller-resource
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit d07e0bafe6e369d9313089008359c51f09f302b4
Author: jackjlli <jl...@linkedin.com>
AuthorDate: Mon Jun 10 22:30:24 2019 -0700

    Add logic for leveraging lead controller resource
---
 .../realtime/LLRealtimeSegmentDataManager.java     |   4 +-
 .../manager/realtime/RealtimeTableDataManager.java |   2 +-
 .../realtime/SegmentBuildTimeLeaseExtender.java    |   8 +-
 .../server/realtime/ControllerLeaderLocator.java   | 106 ++++++++++++++++++---
 .../ServerSegmentCompletionProtocolHandler.java    |   7 +-
 .../realtime/LLRealtimeSegmentDataManagerTest.java |   2 +-
 .../realtime/ControllerLeaderLocatorTest.java      |  19 +++-
 .../tests/SegmentCompletionIntegrationTests.java   |   2 +-
 8 files changed, 120 insertions(+), 30 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 59d2bb8..2e2aced 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1028,6 +1028,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
     _segmentZKMetadata = (LLCRealtimeSegmentZKMetadata) segmentZKMetadata;
     _tableConfig = tableConfig;
+    _tableNameWithType = _tableConfig.getTableName();
     _realtimeTableDataManager = realtimeTableDataManager;
     _resourceDataDir = resourceDataDir;
     _indexLoadingConfig = indexLoadingConfig;
@@ -1036,7 +1037,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _segmentVersion = indexLoadingConfig.getSegmentVersion();
     _instanceId = _realtimeTableDataManager.getServerInstance();
     _leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_instanceId);
-    _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics);
+    _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics, _tableNameWithType);
 
     // TODO Validate configs
     IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
@@ -1046,7 +1047,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _segmentNameStr = _segmentZKMetadata.getSegmentName();
     _segmentName = new LLCSegmentName(_segmentNameStr);
     _streamPartitionId = _segmentName.getPartitionId();
-    _tableNameWithType = _tableConfig.getTableName();
     _timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
     _metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _streamPartitionId;
     segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index cde73a4..8b81dc6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -89,7 +89,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
 
   @Override
   protected void doInit() {
-    _leaseExtender = SegmentBuildTimeLeaseExtender.create(_instanceId, _serverMetrics);
+    _leaseExtender = SegmentBuildTimeLeaseExtender.create(_instanceId, _serverMetrics, _tableNameWithType);
 
     File statsFile = new File(_tableDataDir, STATS_FILE_NAME);
     try {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
index 6676619..fcd6422 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
@@ -57,20 +57,20 @@ public class SegmentBuildTimeLeaseExtender {
   }
 
   public static synchronized SegmentBuildTimeLeaseExtender create(final String instanceId,
-      ServerMetrics serverMetrics) {
+      ServerMetrics serverMetrics, String tableNameWithType) {
     SegmentBuildTimeLeaseExtender leaseExtender = INSTANCE_TO_LEASE_EXTENDER.get(instanceId);
     if (leaseExtender != null) {
       LOGGER.warn("Instance already exists");
     } else {
-      leaseExtender = new SegmentBuildTimeLeaseExtender(instanceId, serverMetrics);
+      leaseExtender = new SegmentBuildTimeLeaseExtender(instanceId, serverMetrics, tableNameWithType);
       INSTANCE_TO_LEASE_EXTENDER.put(instanceId, leaseExtender);
     }
     return leaseExtender;
   }
 
-  private SegmentBuildTimeLeaseExtender(String instanceId, ServerMetrics serverMetrics) {
+  private SegmentBuildTimeLeaseExtender(String instanceId, ServerMetrics serverMetrics, String tableNameWithType) {
     _instanceId = instanceId;
-    _protocolHandler = new ServerSegmentCompletionProtocolHandler(serverMetrics);
+    _protocolHandler = new ServerSegmentCompletionProtocolHandler(serverMetrics, tableNameWithType);
     _executor = new ScheduledThreadPoolExecutor(1);
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
index 8a3b0fb..07b0087 100644
--- a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
@@ -19,15 +19,20 @@
 package org.apache.pinot.server.realtime;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
+import java.util.Set;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ExternalView;
 import org.apache.pinot.core.query.utils.Pair;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME;
+
 // Helix keeps the old controller around for 30s before electing a new one, so we will keep getting
 // the old controller as leader, and it will keep returning NOT_LEADER.
 
@@ -78,35 +83,106 @@ public class ControllerLeaderLocator {
   /**
    * Locate the controller leader so that we can send LLC segment completion requests to it.
    * Checks the {@link ControllerLeaderLocator::_cachedControllerLeaderInvalid} flag and fetches the leader from helix if cached value is invalid
-   *
+   * @param rawTableName table name without type.
    * @return The host:port string of the current controller leader.
    */
-  public synchronized Pair<String, Integer> getControllerLeader() {
+  public synchronized Pair<String, Integer> getControllerLeader(String rawTableName) {
     if (!_cachedControllerLeaderInvalid) {
       return _controllerLeaderHostPort;
     }
 
+    String leaderForTable = getLeaderForTable(rawTableName);
+    if (leaderForTable == null) {
+      LOGGER.warn("Failed to find a leader for Table: {}", rawTableName);
+      _cachedControllerLeaderInvalid = true;
+      return null;
+    } else {
+      _controllerLeaderHostPort = generateControllerLeaderHostPortPair(leaderForTable);
+      _cachedControllerLeaderInvalid = false;
+      LOGGER.info("Setting controller leader to be {}:{}", _controllerLeaderHostPort.getFirst(),
+          _controllerLeaderHostPort.getSecond());
+      return _controllerLeaderHostPort;
+    }
+  }
+
+  /**
+   * If partition leader exists, use this as the leader for realtime segment completion.
+   * Otherwise, try to use Helix leader.
+   * @param rawTableName table name without type
+   * @return the leader for this table.
+   */
+  private String getLeaderForTable(String rawTableName) {
+    String leaderForTable;
+    ExternalView leadControllerResourceExternalView = _helixManager.getClusterManagmentTool().getResourceExternalView(_clusterName, LEAD_CONTROLLER_RESOURCE_NAME);
+    String partitionLeader = getPartitionLeader(leadControllerResourceExternalView, rawTableName);
+    if (partitionLeader != null) {
+      leaderForTable = partitionLeader;
+    } else {
+      // Get Helix leader to be the leader to this table.
+      String helixLeader = getHelixClusterLeader();
+      if (helixLeader != null) {
+        leaderForTable = helixLeader;
+      } else {
+        leaderForTable = null;
+      }
+    }
+    return leaderForTable;
+  }
+
+  /**
+   * Gets partition leader from lead controller resource.
+   * If the resource is disabled or no controller registered as participant, there is no instance in "MASTER" state.
+   *
+   * @param leadControllerResourceExternalView external view of lead controller resource
+   * @param rawTableName table name without type
+   * @return leader of partition, null if not found.
+   */
+  private String getPartitionLeader(ExternalView leadControllerResourceExternalView, String rawTableName) {
+    if (leadControllerResourceExternalView == null) {
+      return null;
+    }
+    Set<String> partitionSet = leadControllerResourceExternalView.getPartitionSet();
+    if (partitionSet == null || partitionSet.isEmpty()) {
+      return null;
+    }
+    int numPartitions = partitionSet.size();
+    int partitionIndex = rawTableName.hashCode() % numPartitions;
+    String partitionName = LEAD_CONTROLLER_RESOURCE_NAME + "_" + partitionIndex;
+    Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partitionName);
+
+    for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+      if ("MASTER".equals(entry.getValue())) {
+        return entry.getKey();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Gets Helix leader in the cluster. Null if there is no leader.
+   * @return Helix leader.
+   */
+  private String getHelixClusterLeader() {
     BaseDataAccessor<ZNRecord> dataAccessor = _helixManager.getHelixDataAccessor().getBaseDataAccessor();
     Stat stat = new Stat();
     try {
-      ZNRecord znRecord =
-          dataAccessor.get("/" + _clusterName + "/CONTROLLER/LEADER", stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
-      String leader = znRecord.getId();
-      int index = leader.lastIndexOf('_');
-      String leaderHost = leader.substring(0, index);
-      int leaderPort = Integer.valueOf(leader.substring(index + 1));
-      _controllerLeaderHostPort = new Pair<>(leaderHost, leaderPort);
-      _cachedControllerLeaderInvalid = false;
-      LOGGER.info("Setting controller leader to be {}:{} as per znode version {}, mtime {}", leaderHost, leaderPort,
-          stat.getVersion(), stat.getMtime());
-      return _controllerLeaderHostPort;
+      ZNRecord znRecord = dataAccessor.get("/" + _clusterName + "/CONTROLLER/LEADER", stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
+      String helixLeader = znRecord.getId();
+      LOGGER.info("Getting Helix leader: {} as per znode version {}, mtime {}", helixLeader, stat.getVersion(), stat.getMtime());
+      return helixLeader;
     } catch (Exception e) {
-      LOGGER.warn("Could not locate controller leader, exception", e);
-      _cachedControllerLeaderInvalid = true;
+      LOGGER.warn("Could not locate Helix leader", e);
       return null;
     }
   }
 
+  private Pair<String, Integer> generateControllerLeaderHostPortPair(String controllerLeaderId) {
+    int index = controllerLeaderId.lastIndexOf('_');
+    String leaderHost = controllerLeaderId.substring(0, index);
+    int leaderPort = Integer.valueOf(controllerLeaderId.substring(index + 1));
+    return new Pair<>(leaderHost, leaderPort);
+  }
+
   /**
    * Invalidates the cached controller leader value by setting the {@link ControllerLeaderLocator::_cacheControllerLeadeInvalid} flag.
    * This flag is always checked first by {@link ControllerLeaderLocator::getControllerLeader()} method before returning the leader. If set, leader is fetched from helix, else cached leader value is returned.
diff --git a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
index bcfa7ee..5c75f1e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
@@ -23,6 +23,7 @@ import java.net.URI;
 import java.util.Map;
 import javax.net.ssl.SSLContext;
 import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
@@ -51,6 +52,7 @@ public class ServerSegmentCompletionProtocolHandler {
 
   private final FileUploadDownloadClient _fileUploadDownloadClient;
   private final ServerMetrics _serverMetrics;
+  private final String _rawTableName;
 
   public static void init(Configuration uploaderConfig) {
     Configuration httpsConfig = uploaderConfig.subset(HTTPS_PROTOCOL);
@@ -62,9 +64,10 @@ public class ServerSegmentCompletionProtocolHandler {
         uploaderConfig.getInt(CONFIG_OF_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS, DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS);
   }
 
-  public ServerSegmentCompletionProtocolHandler(ServerMetrics serverMetrics) {
+  public ServerSegmentCompletionProtocolHandler(ServerMetrics serverMetrics, String tableNameWithType) {
     _fileUploadDownloadClient = new FileUploadDownloadClient(_sslContext);
     _serverMetrics = serverMetrics;
+    _rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
   }
 
   public SegmentCompletionProtocol.Response segmentCommitStart(SegmentCompletionProtocol.Request.Params params) {
@@ -159,7 +162,7 @@ public class ServerSegmentCompletionProtocolHandler {
 
   private String createSegmentCompletionUrl(SegmentCompletionProtocol.Request request) {
     ControllerLeaderLocator leaderLocator = ControllerLeaderLocator.getInstance();
-    final Pair<String, Integer> leaderHostPort = leaderLocator.getControllerLeader();
+    final Pair<String, Integer> leaderHostPort = leaderLocator.getControllerLeader(_rawTableName);
     if (leaderHostPort == null) {
       LOGGER.warn("No leader found while trying to send {}", request.toString());
       return null;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 3d7bfce..8ff15b8 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -111,7 +111,7 @@ public class LLRealtimeSegmentDataManagerTest {
 
   private RealtimeTableDataManager createTableDataManager() {
     final String instanceId = "server-1";
-    SegmentBuildTimeLeaseExtender.create(instanceId, new ServerMetrics(new MetricsRegistry()));
+    SegmentBuildTimeLeaseExtender.create(instanceId, new ServerMetrics(new MetricsRegistry()), _tableName);
     RealtimeTableDataManager tableDataManager = mock(RealtimeTableDataManager.class);
     when(tableDataManager.getServerInstance()).thenReturn(instanceId);
     RealtimeSegmentStatsHistory statsHistory = mock(RealtimeSegmentStatsHistory.class);
diff --git a/pinot-core/src/test/java/org/apache/pinot/server/realtime/ControllerLeaderLocatorTest.java b/pinot-core/src/test/java/org/apache/pinot/server/realtime/ControllerLeaderLocatorTest.java
index 34e955d..5f0cdbc 100644
--- a/pinot-core/src/test/java/org/apache/pinot/server/realtime/ControllerLeaderLocatorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/server/realtime/ControllerLeaderLocatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.server.realtime;
 
 import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
@@ -35,6 +36,7 @@ import static org.mockito.Mockito.when;
 
 
 public class ControllerLeaderLocatorTest {
+  private String testTable = "testTable";
 
   /**
    * Tests the invalidate logic for cached controller leader
@@ -45,6 +47,7 @@ public class ControllerLeaderLocatorTest {
     HelixManager helixManager = mock(HelixManager.class);
     HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
     BaseDataAccessor<ZNRecord> baseDataAccessor = mock(BaseDataAccessor.class);
+    HelixAdmin helixAdmin = mock(HelixAdmin.class);
     ZNRecord znRecord = mock(ZNRecord.class);
     final String leaderHost = "host";
     final int leaderPort = 12345;
@@ -54,6 +57,8 @@ public class ControllerLeaderLocatorTest {
     when(znRecord.getId()).thenReturn(leaderHost + "_" + leaderPort);
     when(baseDataAccessor.get(anyString(), any(), anyInt())).thenReturn(znRecord);
     when(helixManager.getClusterName()).thenReturn("testCluster");
+    when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
+    when(helixAdmin.getResourceExternalView(anyString(), anyString())).thenReturn(null);
 
     // Create Controller Leader Locator
     FakeControllerLeaderLocator.create(helixManager);
@@ -78,7 +83,7 @@ public class ControllerLeaderLocatorTest {
     Assert.assertEquals(controllerLeaderLocator.getLastCacheInvalidateMillis(), lastCacheInvalidateMillis);
 
     // getControllerLeader, which validates the cache
-    controllerLeaderLocator.getControllerLeader();
+    controllerLeaderLocator.getControllerLeader(testTable);
     Assert.assertFalse(controllerLeaderLocator.isCachedControllerLeaderInvalid());
     Assert.assertEquals(controllerLeaderLocator.getLastCacheInvalidateMillis(), lastCacheInvalidateMillis);
 
@@ -104,16 +109,19 @@ public class ControllerLeaderLocatorTest {
     HelixManager helixManager = mock(HelixManager.class);
     HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
     BaseDataAccessor<ZNRecord> baseDataAccessor = mock(BaseDataAccessor.class);
+    HelixAdmin helixAdmin = mock(HelixAdmin.class);
 
     when(helixManager.getHelixDataAccessor()).thenReturn(helixDataAccessor);
     when(helixDataAccessor.getBaseDataAccessor()).thenReturn(baseDataAccessor);
     when(baseDataAccessor.get(anyString(), (Stat) any(), anyInt())).thenThrow(new RuntimeException());
+    when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
+    when(helixAdmin.getResourceExternalView(anyString(), anyString())).thenReturn(null);
 
     // Create Controller Leader Locator
     FakeControllerLeaderLocator.create(helixManager);
     ControllerLeaderLocator controllerLeaderLocator = FakeControllerLeaderLocator.getInstance();
 
-    Assert.assertEquals(controllerLeaderLocator.getControllerLeader(), null);
+    Assert.assertEquals(controllerLeaderLocator.getControllerLeader(testTable), null);
   }
 
   @Test
@@ -121,6 +129,7 @@ public class ControllerLeaderLocatorTest {
     HelixManager helixManager = mock(HelixManager.class);
     HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
     BaseDataAccessor<ZNRecord> baseDataAccessor = mock(BaseDataAccessor.class);
+    HelixAdmin helixAdmin = mock(HelixAdmin.class);
     ZNRecord znRecord = mock(ZNRecord.class);
     final String leaderHost = "host";
     final int leaderPort = 12345;
@@ -130,14 +139,16 @@ public class ControllerLeaderLocatorTest {
     when(znRecord.getId()).thenReturn(leaderHost + "_" + leaderPort);
     when(baseDataAccessor.get(anyString(), (Stat) any(), anyInt())).thenReturn(znRecord);
     when(helixManager.getClusterName()).thenReturn("myCluster");
+    when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
+    when(helixAdmin.getResourceExternalView(anyString(), anyString())).thenReturn(null);
 
     // Create Controller Leader Locator
     FakeControllerLeaderLocator.create(helixManager);
     ControllerLeaderLocator controllerLeaderLocator = FakeControllerLeaderLocator.getInstance();
 
     Pair<String, Integer> expectedLeaderLocation = new Pair<>(leaderHost, leaderPort);
-    Assert.assertEquals(controllerLeaderLocator.getControllerLeader().getFirst(), expectedLeaderLocation.getFirst());
-    Assert.assertEquals(controllerLeaderLocator.getControllerLeader().getSecond(), expectedLeaderLocation.getSecond());
+    Assert.assertEquals(controllerLeaderLocator.getControllerLeader(testTable).getFirst(), expectedLeaderLocation.getFirst());
+    Assert.assertEquals(controllerLeaderLocator.getControllerLeader(testTable).getSecond(), expectedLeaderLocation.getSecond());
   }
 
   static class FakeControllerLeaderLocator extends ControllerLeaderLocator {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java
index 7e7b041..f6cd400 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java
@@ -134,7 +134,7 @@ public class SegmentCompletionIntegrationTests extends LLCRealtimeClusterIntegra
 
     // Now report to the controller that we had to stop consumption
     ServerSegmentCompletionProtocolHandler protocolHandler =
-        new ServerSegmentCompletionProtocolHandler(new ServerMetrics(new MetricsRegistry()));
+        new ServerSegmentCompletionProtocolHandler(new ServerMetrics(new MetricsRegistry()), realtimeTableName);
     SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
     params.withOffset(45688L).withSegmentName(_currentSegment).withReason("RandomReason")
         .withInstanceId(_serverInstance);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org