You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/06/14 23:45:58 UTC

[incubator-pinot] 01/02: Add logic for leveraging lead controller resource

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch add-logic-for-lead-controller-resource
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit e87ee3966a57cb6943620f78a2b518961a09656a
Author: jackjlli <jl...@linkedin.com>
AuthorDate: Mon Jun 10 22:30:24 2019 -0700

    Add logic for leveraging lead controller resource
---
 .../realtime/LLRealtimeSegmentDataManager.java     |   4 +-
 .../manager/realtime/RealtimeTableDataManager.java |   2 +-
 .../realtime/SegmentBuildTimeLeaseExtender.java    |   8 +-
 .../server/realtime/ControllerLeaderLocator.java   | 106 ++++++++++++++++++---
 .../ServerSegmentCompletionProtocolHandler.java    |   7 +-
 .../realtime/LLRealtimeSegmentDataManagerTest.java |   2 +-
 .../realtime/ControllerLeaderLocatorTest.java      |  19 +++-
 .../tests/SegmentCompletionIntegrationTests.java   |   2 +-
 8 files changed, 120 insertions(+), 30 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 59d2bb8..2e2aced 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1028,6 +1028,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
     _segmentZKMetadata = (LLCRealtimeSegmentZKMetadata) segmentZKMetadata;
     _tableConfig = tableConfig;
+    _tableNameWithType = _tableConfig.getTableName();
     _realtimeTableDataManager = realtimeTableDataManager;
     _resourceDataDir = resourceDataDir;
     _indexLoadingConfig = indexLoadingConfig;
@@ -1036,7 +1037,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _segmentVersion = indexLoadingConfig.getSegmentVersion();
     _instanceId = _realtimeTableDataManager.getServerInstance();
     _leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_instanceId);
-    _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics);
+    _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics, _tableNameWithType);
 
     // TODO Validate configs
     IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
@@ -1046,7 +1047,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _segmentNameStr = _segmentZKMetadata.getSegmentName();
     _segmentName = new LLCSegmentName(_segmentNameStr);
     _streamPartitionId = _segmentName.getPartitionId();
-    _tableNameWithType = _tableConfig.getTableName();
     _timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
     _metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _streamPartitionId;
     segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index cde73a4..8b81dc6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -89,7 +89,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
 
   @Override
   protected void doInit() {
-    _leaseExtender = SegmentBuildTimeLeaseExtender.create(_instanceId, _serverMetrics);
+    _leaseExtender = SegmentBuildTimeLeaseExtender.create(_instanceId, _serverMetrics, _tableNameWithType);
 
     File statsFile = new File(_tableDataDir, STATS_FILE_NAME);
     try {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
index 6676619..fcd6422 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
@@ -57,20 +57,20 @@ public class SegmentBuildTimeLeaseExtender {
   }
 
   public static synchronized SegmentBuildTimeLeaseExtender create(final String instanceId,
-      ServerMetrics serverMetrics) {
+      ServerMetrics serverMetrics, String tableNameWithType) {
     SegmentBuildTimeLeaseExtender leaseExtender = INSTANCE_TO_LEASE_EXTENDER.get(instanceId);
     if (leaseExtender != null) {
       LOGGER.warn("Instance already exists");
     } else {
-      leaseExtender = new SegmentBuildTimeLeaseExtender(instanceId, serverMetrics);
+      leaseExtender = new SegmentBuildTimeLeaseExtender(instanceId, serverMetrics, tableNameWithType);
       INSTANCE_TO_LEASE_EXTENDER.put(instanceId, leaseExtender);
     }
     return leaseExtender;
   }
 
-  private SegmentBuildTimeLeaseExtender(String instanceId, ServerMetrics serverMetrics) {
+  private SegmentBuildTimeLeaseExtender(String instanceId, ServerMetrics serverMetrics, String tableNameWithType) {
     _instanceId = instanceId;
-    _protocolHandler = new ServerSegmentCompletionProtocolHandler(serverMetrics);
+    _protocolHandler = new ServerSegmentCompletionProtocolHandler(serverMetrics, tableNameWithType);
     _executor = new ScheduledThreadPoolExecutor(1);
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
index 8a3b0fb..07b0087 100644
--- a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
@@ -19,15 +19,20 @@
 package org.apache.pinot.server.realtime;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
+import java.util.Set;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ExternalView;
 import org.apache.pinot.core.query.utils.Pair;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME;
+
 // Helix keeps the old controller around for 30s before electing a new one, so we will keep getting
 // the old controller as leader, and it will keep returning NOT_LEADER.
 
@@ -78,35 +83,106 @@ public class ControllerLeaderLocator {
   /**
    * Locate the controller leader so that we can send LLC segment completion requests to it.
    * Checks the {@link ControllerLeaderLocator::_cachedControllerLeaderInvalid} flag and fetches the leader from helix if cached value is invalid
-   *
+   * @param rawTableName table name without type.
    * @return The host:port string of the current controller leader.
    */
-  public synchronized Pair<String, Integer> getControllerLeader() {
+  public synchronized Pair<String, Integer> getControllerLeader(String rawTableName) {
     if (!_cachedControllerLeaderInvalid) {
       return _controllerLeaderHostPort;
     }
 
+    String leaderForTable = getLeaderForTable(rawTableName);
+    if (leaderForTable == null) {
+      LOGGER.warn("Failed to find a leader for Table: {}", rawTableName);
+      _cachedControllerLeaderInvalid = true;
+      return null;
+    } else {
+      _controllerLeaderHostPort = generateControllerLeaderHostPortPair(leaderForTable);
+      _cachedControllerLeaderInvalid = false;
+      LOGGER.info("Setting controller leader to be {}:{}", _controllerLeaderHostPort.getFirst(),
+          _controllerLeaderHostPort.getSecond());
+      return _controllerLeaderHostPort;
+    }
+  }
+
+  /**
+   * If partition leader exists, use this as the leader for realtime segment completion.
+   * Otherwise, try to use Helix leader.
+   * @param rawTableName table name without type
+   * @return the leader for this table.
+   */
+  private String getLeaderForTable(String rawTableName) {
+    String leaderForTable;
+    ExternalView leadControllerResourceExternalView = _helixManager.getClusterManagmentTool().getResourceExternalView(_clusterName, LEAD_CONTROLLER_RESOURCE_NAME);
+    String partitionLeader = getPartitionLeader(leadControllerResourceExternalView, rawTableName);
+    if (partitionLeader != null) {
+      leaderForTable = partitionLeader;
+    } else {
+      // Get Helix leader to be the leader to this table.
+      String helixLeader = getHelixClusterLeader();
+      if (helixLeader != null) {
+        leaderForTable = helixLeader;
+      } else {
+        leaderForTable = null;
+      }
+    }
+    return leaderForTable;
+  }
+
+  /**
+   * Gets partition leader from lead controller resource.
+   * If the resource is disabled or no controller registered as participant, there is no instance in "MASTER" state.
+   *
+   * @param leadControllerResourceExternalView external view of lead controller resource
+   * @param rawTableName table name without type
+   * @return leader of partition, null if not found.
+   */
+  private String getPartitionLeader(ExternalView leadControllerResourceExternalView, String rawTableName) {
+    if (leadControllerResourceExternalView == null) {
+      return null;
+    }
+    Set<String> partitionSet = leadControllerResourceExternalView.getPartitionSet();
+    if (partitionSet == null || partitionSet.isEmpty()) {
+      return null;
+    }
+    int numPartitions = partitionSet.size();
+    int partitionIndex = rawTableName.hashCode() % numPartitions;
+    String partitionName = LEAD_CONTROLLER_RESOURCE_NAME + "_" + partitionIndex;
+    Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partitionName);
+
+    for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+      if ("MASTER".equals(entry.getValue())) {
+        return entry.getKey();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Gets Helix leader in the cluster. Null if there is no leader.
+   * @return Helix leader.
+   */
+  private String getHelixClusterLeader() {
     BaseDataAccessor<ZNRecord> dataAccessor = _helixManager.getHelixDataAccessor().getBaseDataAccessor();
     Stat stat = new Stat();
     try {
-      ZNRecord znRecord =
-          dataAccessor.get("/" + _clusterName + "/CONTROLLER/LEADER", stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
-      String leader = znRecord.getId();
-      int index = leader.lastIndexOf('_');
-      String leaderHost = leader.substring(0, index);
-      int leaderPort = Integer.valueOf(leader.substring(index + 1));
-      _controllerLeaderHostPort = new Pair<>(leaderHost, leaderPort);
-      _cachedControllerLeaderInvalid = false;
-      LOGGER.info("Setting controller leader to be {}:{} as per znode version {}, mtime {}", leaderHost, leaderPort,
-          stat.getVersion(), stat.getMtime());
-      return _controllerLeaderHostPort;
+      ZNRecord znRecord = dataAccessor.get("/" + _clusterName + "/CONTROLLER/LEADER", stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
+      String helixLeader = znRecord.getId();
+      LOGGER.info("Getting Helix leader: {} as per znode version {}, mtime {}", helixLeader, stat.getVersion(), stat.getMtime());
+      return helixLeader;
     } catch (Exception e) {
-      LOGGER.warn("Could not locate controller leader, exception", e);
-      _cachedControllerLeaderInvalid = true;
+      LOGGER.warn("Could not locate Helix leader", e);
       return null;
     }
   }
 
+  private Pair<String, Integer> generateControllerLeaderHostPortPair(String controllerLeaderId) {
+    int index = controllerLeaderId.lastIndexOf('_');
+    String leaderHost = controllerLeaderId.substring(0, index);
+    int leaderPort = Integer.valueOf(controllerLeaderId.substring(index + 1));
+    return new Pair<>(leaderHost, leaderPort);
+  }
+
   /**
    * Invalidates the cached controller leader value by setting the {@link ControllerLeaderLocator::_cacheControllerLeadeInvalid} flag.
    * This flag is always checked first by {@link ControllerLeaderLocator::getControllerLeader()} method before returning the leader. If set, leader is fetched from helix, else cached leader value is returned.
diff --git a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
index bcfa7ee..5c75f1e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
@@ -23,6 +23,7 @@ import java.net.URI;
 import java.util.Map;
 import javax.net.ssl.SSLContext;
 import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
@@ -51,6 +52,7 @@ public class ServerSegmentCompletionProtocolHandler {
 
   private final FileUploadDownloadClient _fileUploadDownloadClient;
   private final ServerMetrics _serverMetrics;
+  private final String _rawTableName;
 
   public static void init(Configuration uploaderConfig) {
     Configuration httpsConfig = uploaderConfig.subset(HTTPS_PROTOCOL);
@@ -62,9 +64,10 @@ public class ServerSegmentCompletionProtocolHandler {
         uploaderConfig.getInt(CONFIG_OF_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS, DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS);
   }
 
-  public ServerSegmentCompletionProtocolHandler(ServerMetrics serverMetrics) {
+  public ServerSegmentCompletionProtocolHandler(ServerMetrics serverMetrics, String tableNameWithType) {
     _fileUploadDownloadClient = new FileUploadDownloadClient(_sslContext);
     _serverMetrics = serverMetrics;
+    _rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
   }
 
   public SegmentCompletionProtocol.Response segmentCommitStart(SegmentCompletionProtocol.Request.Params params) {
@@ -159,7 +162,7 @@ public class ServerSegmentCompletionProtocolHandler {
 
   private String createSegmentCompletionUrl(SegmentCompletionProtocol.Request request) {
     ControllerLeaderLocator leaderLocator = ControllerLeaderLocator.getInstance();
-    final Pair<String, Integer> leaderHostPort = leaderLocator.getControllerLeader();
+    final Pair<String, Integer> leaderHostPort = leaderLocator.getControllerLeader(_rawTableName);
     if (leaderHostPort == null) {
       LOGGER.warn("No leader found while trying to send {}", request.toString());
       return null;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 3d7bfce..8ff15b8 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -111,7 +111,7 @@ public class LLRealtimeSegmentDataManagerTest {
 
   private RealtimeTableDataManager createTableDataManager() {
     final String instanceId = "server-1";
-    SegmentBuildTimeLeaseExtender.create(instanceId, new ServerMetrics(new MetricsRegistry()));
+    SegmentBuildTimeLeaseExtender.create(instanceId, new ServerMetrics(new MetricsRegistry()), _tableName);
     RealtimeTableDataManager tableDataManager = mock(RealtimeTableDataManager.class);
     when(tableDataManager.getServerInstance()).thenReturn(instanceId);
     RealtimeSegmentStatsHistory statsHistory = mock(RealtimeSegmentStatsHistory.class);
diff --git a/pinot-core/src/test/java/org/apache/pinot/server/realtime/ControllerLeaderLocatorTest.java b/pinot-core/src/test/java/org/apache/pinot/server/realtime/ControllerLeaderLocatorTest.java
index 34e955d..5f0cdbc 100644
--- a/pinot-core/src/test/java/org/apache/pinot/server/realtime/ControllerLeaderLocatorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/server/realtime/ControllerLeaderLocatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.server.realtime;
 
 import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
@@ -35,6 +36,7 @@ import static org.mockito.Mockito.when;
 
 
 public class ControllerLeaderLocatorTest {
+  private String testTable = "testTable";
 
   /**
    * Tests the invalidate logic for cached controller leader
@@ -45,6 +47,7 @@ public class ControllerLeaderLocatorTest {
     HelixManager helixManager = mock(HelixManager.class);
     HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
     BaseDataAccessor<ZNRecord> baseDataAccessor = mock(BaseDataAccessor.class);
+    HelixAdmin helixAdmin = mock(HelixAdmin.class);
     ZNRecord znRecord = mock(ZNRecord.class);
     final String leaderHost = "host";
     final int leaderPort = 12345;
@@ -54,6 +57,8 @@ public class ControllerLeaderLocatorTest {
     when(znRecord.getId()).thenReturn(leaderHost + "_" + leaderPort);
     when(baseDataAccessor.get(anyString(), any(), anyInt())).thenReturn(znRecord);
     when(helixManager.getClusterName()).thenReturn("testCluster");
+    when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
+    when(helixAdmin.getResourceExternalView(anyString(), anyString())).thenReturn(null);
 
     // Create Controller Leader Locator
     FakeControllerLeaderLocator.create(helixManager);
@@ -78,7 +83,7 @@ public class ControllerLeaderLocatorTest {
     Assert.assertEquals(controllerLeaderLocator.getLastCacheInvalidateMillis(), lastCacheInvalidateMillis);
 
     // getControllerLeader, which validates the cache
-    controllerLeaderLocator.getControllerLeader();
+    controllerLeaderLocator.getControllerLeader(testTable);
     Assert.assertFalse(controllerLeaderLocator.isCachedControllerLeaderInvalid());
     Assert.assertEquals(controllerLeaderLocator.getLastCacheInvalidateMillis(), lastCacheInvalidateMillis);
 
@@ -104,16 +109,19 @@ public class ControllerLeaderLocatorTest {
     HelixManager helixManager = mock(HelixManager.class);
     HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
     BaseDataAccessor<ZNRecord> baseDataAccessor = mock(BaseDataAccessor.class);
+    HelixAdmin helixAdmin = mock(HelixAdmin.class);
 
     when(helixManager.getHelixDataAccessor()).thenReturn(helixDataAccessor);
     when(helixDataAccessor.getBaseDataAccessor()).thenReturn(baseDataAccessor);
     when(baseDataAccessor.get(anyString(), (Stat) any(), anyInt())).thenThrow(new RuntimeException());
+    when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
+    when(helixAdmin.getResourceExternalView(anyString(), anyString())).thenReturn(null);
 
     // Create Controller Leader Locator
     FakeControllerLeaderLocator.create(helixManager);
     ControllerLeaderLocator controllerLeaderLocator = FakeControllerLeaderLocator.getInstance();
 
-    Assert.assertEquals(controllerLeaderLocator.getControllerLeader(), null);
+    Assert.assertEquals(controllerLeaderLocator.getControllerLeader(testTable), null);
   }
 
   @Test
@@ -121,6 +129,7 @@ public class ControllerLeaderLocatorTest {
     HelixManager helixManager = mock(HelixManager.class);
     HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
     BaseDataAccessor<ZNRecord> baseDataAccessor = mock(BaseDataAccessor.class);
+    HelixAdmin helixAdmin = mock(HelixAdmin.class);
     ZNRecord znRecord = mock(ZNRecord.class);
     final String leaderHost = "host";
     final int leaderPort = 12345;
@@ -130,14 +139,16 @@ public class ControllerLeaderLocatorTest {
     when(znRecord.getId()).thenReturn(leaderHost + "_" + leaderPort);
     when(baseDataAccessor.get(anyString(), (Stat) any(), anyInt())).thenReturn(znRecord);
     when(helixManager.getClusterName()).thenReturn("myCluster");
+    when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
+    when(helixAdmin.getResourceExternalView(anyString(), anyString())).thenReturn(null);
 
     // Create Controller Leader Locator
     FakeControllerLeaderLocator.create(helixManager);
     ControllerLeaderLocator controllerLeaderLocator = FakeControllerLeaderLocator.getInstance();
 
     Pair<String, Integer> expectedLeaderLocation = new Pair<>(leaderHost, leaderPort);
-    Assert.assertEquals(controllerLeaderLocator.getControllerLeader().getFirst(), expectedLeaderLocation.getFirst());
-    Assert.assertEquals(controllerLeaderLocator.getControllerLeader().getSecond(), expectedLeaderLocation.getSecond());
+    Assert.assertEquals(controllerLeaderLocator.getControllerLeader(testTable).getFirst(), expectedLeaderLocation.getFirst());
+    Assert.assertEquals(controllerLeaderLocator.getControllerLeader(testTable).getSecond(), expectedLeaderLocation.getSecond());
   }
 
   static class FakeControllerLeaderLocator extends ControllerLeaderLocator {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java
index 7e7b041..f6cd400 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java
@@ -134,7 +134,7 @@ public class SegmentCompletionIntegrationTests extends LLCRealtimeClusterIntegra
 
     // Now report to the controller that we had to stop consumption
     ServerSegmentCompletionProtocolHandler protocolHandler =
-        new ServerSegmentCompletionProtocolHandler(new ServerMetrics(new MetricsRegistry()));
+        new ServerSegmentCompletionProtocolHandler(new ServerMetrics(new MetricsRegistry()), realtimeTableName);
     SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
     params.withOffset(45688L).withSegmentName(_currentSegment).withReason("RandomReason")
         .withInstanceId(_serverInstance);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org