You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/06/14 23:45:58 UTC
[incubator-pinot] 01/02: Add logic for leveraging lead controller
resource
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch add-logic-for-lead-controller-resource
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit e87ee3966a57cb6943620f78a2b518961a09656a
Author: jackjlli <jl...@linkedin.com>
AuthorDate: Mon Jun 10 22:30:24 2019 -0700
Add logic for leveraging lead controller resource
---
.../realtime/LLRealtimeSegmentDataManager.java | 4 +-
.../manager/realtime/RealtimeTableDataManager.java | 2 +-
.../realtime/SegmentBuildTimeLeaseExtender.java | 8 +-
.../server/realtime/ControllerLeaderLocator.java | 106 ++++++++++++++++++---
.../ServerSegmentCompletionProtocolHandler.java | 7 +-
.../realtime/LLRealtimeSegmentDataManagerTest.java | 2 +-
.../realtime/ControllerLeaderLocatorTest.java | 19 +++-
.../tests/SegmentCompletionIntegrationTests.java | 2 +-
8 files changed, 120 insertions(+), 30 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 59d2bb8..2e2aced 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1028,6 +1028,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
_segmentZKMetadata = (LLCRealtimeSegmentZKMetadata) segmentZKMetadata;
_tableConfig = tableConfig;
+ _tableNameWithType = _tableConfig.getTableName();
_realtimeTableDataManager = realtimeTableDataManager;
_resourceDataDir = resourceDataDir;
_indexLoadingConfig = indexLoadingConfig;
@@ -1036,7 +1037,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_segmentVersion = indexLoadingConfig.getSegmentVersion();
_instanceId = _realtimeTableDataManager.getServerInstance();
_leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_instanceId);
- _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics);
+ _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics, _tableNameWithType);
// TODO Validate configs
IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
@@ -1046,7 +1047,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_segmentNameStr = _segmentZKMetadata.getSegmentName();
_segmentName = new LLCSegmentName(_segmentNameStr);
_streamPartitionId = _segmentName.getPartitionId();
- _tableNameWithType = _tableConfig.getTableName();
_timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
_metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _streamPartitionId;
segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index cde73a4..8b81dc6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -89,7 +89,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
@Override
protected void doInit() {
- _leaseExtender = SegmentBuildTimeLeaseExtender.create(_instanceId, _serverMetrics);
+ _leaseExtender = SegmentBuildTimeLeaseExtender.create(_instanceId, _serverMetrics, _tableNameWithType);
File statsFile = new File(_tableDataDir, STATS_FILE_NAME);
try {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
index 6676619..fcd6422 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
@@ -57,20 +57,20 @@ public class SegmentBuildTimeLeaseExtender {
}
public static synchronized SegmentBuildTimeLeaseExtender create(final String instanceId,
- ServerMetrics serverMetrics) {
+ ServerMetrics serverMetrics, String tableNameWithType) {
SegmentBuildTimeLeaseExtender leaseExtender = INSTANCE_TO_LEASE_EXTENDER.get(instanceId);
if (leaseExtender != null) {
LOGGER.warn("Instance already exists");
} else {
- leaseExtender = new SegmentBuildTimeLeaseExtender(instanceId, serverMetrics);
+ leaseExtender = new SegmentBuildTimeLeaseExtender(instanceId, serverMetrics, tableNameWithType);
INSTANCE_TO_LEASE_EXTENDER.put(instanceId, leaseExtender);
}
return leaseExtender;
}
- private SegmentBuildTimeLeaseExtender(String instanceId, ServerMetrics serverMetrics) {
+ private SegmentBuildTimeLeaseExtender(String instanceId, ServerMetrics serverMetrics, String tableNameWithType) {
_instanceId = instanceId;
- _protocolHandler = new ServerSegmentCompletionProtocolHandler(serverMetrics);
+ _protocolHandler = new ServerSegmentCompletionProtocolHandler(serverMetrics, tableNameWithType);
_executor = new ScheduledThreadPoolExecutor(1);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
index 8a3b0fb..07b0087 100644
--- a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
@@ -19,15 +19,20 @@
package org.apache.pinot.server.realtime;
import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
+import java.util.Set;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ExternalView;
import org.apache.pinot.core.query.utils.Pair;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME;
+
// Helix keeps the old controller around for 30s before electing a new one, so we will keep getting
// the old controller as leader, and it will keep returning NOT_LEADER.
@@ -78,35 +83,106 @@ public class ControllerLeaderLocator {
/**
* Locate the controller leader so that we can send LLC segment completion requests to it.
* Checks the {@link ControllerLeaderLocator::_cachedControllerLeaderInvalid} flag and fetches the leader from helix if cached value is invalid
- *
+ * @param rawTableName table name without type.
* @return The host:port string of the current controller leader.
*/
- public synchronized Pair<String, Integer> getControllerLeader() {
+ public synchronized Pair<String, Integer> getControllerLeader(String rawTableName) {
if (!_cachedControllerLeaderInvalid) {
return _controllerLeaderHostPort;
}
+ String leaderForTable = getLeaderForTable(rawTableName);
+ if (leaderForTable == null) {
+ LOGGER.warn("Failed to find a leader for Table: {}", rawTableName);
+ _cachedControllerLeaderInvalid = true;
+ return null;
+ } else {
+ _controllerLeaderHostPort = generateControllerLeaderHostPortPair(leaderForTable);
+ _cachedControllerLeaderInvalid = false;
+ LOGGER.info("Setting controller leader to be {}:{}", _controllerLeaderHostPort.getFirst(),
+ _controllerLeaderHostPort.getSecond());
+ return _controllerLeaderHostPort;
+ }
+ }
+
+ /**
+ * If partition leader exists, use this as the leader for realtime segment completion.
+ * Otherwise, try to use Helix leader.
+ * @param rawTableName table name without type
+ * @return the leader for this table.
+ */
+ private String getLeaderForTable(String rawTableName) {
+ String leaderForTable;
+ ExternalView leadControllerResourceExternalView = _helixManager.getClusterManagmentTool().getResourceExternalView(_clusterName, LEAD_CONTROLLER_RESOURCE_NAME);
+ String partitionLeader = getPartitionLeader(leadControllerResourceExternalView, rawTableName);
+ if (partitionLeader != null) {
+ leaderForTable = partitionLeader;
+ } else {
+ // Get Helix leader to be the leader to this table.
+ String helixLeader = getHelixClusterLeader();
+ if (helixLeader != null) {
+ leaderForTable = helixLeader;
+ } else {
+ leaderForTable = null;
+ }
+ }
+ return leaderForTable;
+ }
+
+ /**
+ * Gets partition leader from lead controller resource.
+ * If the resource is disabled or no controller registered as participant, there is no instance in "MASTER" state.
+ *
+ * @param leadControllerResourceExternalView external view of lead controller resource
+ * @param rawTableName table name without type
+ * @return leader of partition, null if not found.
+ */
+ private String getPartitionLeader(ExternalView leadControllerResourceExternalView, String rawTableName) {
+ if (leadControllerResourceExternalView == null) {
+ return null;
+ }
+ Set<String> partitionSet = leadControllerResourceExternalView.getPartitionSet();
+ if (partitionSet == null || partitionSet.isEmpty()) {
+ return null;
+ }
+ int numPartitions = partitionSet.size();
+ int partitionIndex = rawTableName.hashCode() % numPartitions;
+ String partitionName = LEAD_CONTROLLER_RESOURCE_NAME + "_" + partitionIndex;
+ Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partitionName);
+
+ for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+ if ("MASTER".equals(entry.getValue())) {
+ return entry.getKey();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Gets Helix leader in the cluster. Null if there is no leader.
+ * @return Helix leader.
+ */
+ private String getHelixClusterLeader() {
BaseDataAccessor<ZNRecord> dataAccessor = _helixManager.getHelixDataAccessor().getBaseDataAccessor();
Stat stat = new Stat();
try {
- ZNRecord znRecord =
- dataAccessor.get("/" + _clusterName + "/CONTROLLER/LEADER", stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
- String leader = znRecord.getId();
- int index = leader.lastIndexOf('_');
- String leaderHost = leader.substring(0, index);
- int leaderPort = Integer.valueOf(leader.substring(index + 1));
- _controllerLeaderHostPort = new Pair<>(leaderHost, leaderPort);
- _cachedControllerLeaderInvalid = false;
- LOGGER.info("Setting controller leader to be {}:{} as per znode version {}, mtime {}", leaderHost, leaderPort,
- stat.getVersion(), stat.getMtime());
- return _controllerLeaderHostPort;
+ ZNRecord znRecord = dataAccessor.get("/" + _clusterName + "/CONTROLLER/LEADER", stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
+ String helixLeader = znRecord.getId();
+ LOGGER.info("Getting Helix leader: {} as per znode version {}, mtime {}", helixLeader, stat.getVersion(), stat.getMtime());
+ return helixLeader;
} catch (Exception e) {
- LOGGER.warn("Could not locate controller leader, exception", e);
- _cachedControllerLeaderInvalid = true;
+ LOGGER.warn("Could not locate Helix leader", e);
return null;
}
}
+ private Pair<String, Integer> generateControllerLeaderHostPortPair(String controllerLeaderId) {
+ int index = controllerLeaderId.lastIndexOf('_');
+ String leaderHost = controllerLeaderId.substring(0, index);
+ int leaderPort = Integer.valueOf(controllerLeaderId.substring(index + 1));
+ return new Pair<>(leaderHost, leaderPort);
+ }
+
/**
* Invalidates the cached controller leader value by setting the {@link ControllerLeaderLocator::_cacheControllerLeadeInvalid} flag.
* This flag is always checked first by {@link ControllerLeaderLocator::getControllerLeader()} method before returning the leader. If set, leader is fetched from helix, else cached leader value is returned.
diff --git a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
index bcfa7ee..5c75f1e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
@@ -23,6 +23,7 @@ import java.net.URI;
import java.util.Map;
import javax.net.ssl.SSLContext;
import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
@@ -51,6 +52,7 @@ public class ServerSegmentCompletionProtocolHandler {
private final FileUploadDownloadClient _fileUploadDownloadClient;
private final ServerMetrics _serverMetrics;
+ private final String _rawTableName;
public static void init(Configuration uploaderConfig) {
Configuration httpsConfig = uploaderConfig.subset(HTTPS_PROTOCOL);
@@ -62,9 +64,10 @@ public class ServerSegmentCompletionProtocolHandler {
uploaderConfig.getInt(CONFIG_OF_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS, DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS);
}
- public ServerSegmentCompletionProtocolHandler(ServerMetrics serverMetrics) {
+ public ServerSegmentCompletionProtocolHandler(ServerMetrics serverMetrics, String tableNameWithType) {
_fileUploadDownloadClient = new FileUploadDownloadClient(_sslContext);
_serverMetrics = serverMetrics;
+ _rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
}
public SegmentCompletionProtocol.Response segmentCommitStart(SegmentCompletionProtocol.Request.Params params) {
@@ -159,7 +162,7 @@ public class ServerSegmentCompletionProtocolHandler {
private String createSegmentCompletionUrl(SegmentCompletionProtocol.Request request) {
ControllerLeaderLocator leaderLocator = ControllerLeaderLocator.getInstance();
- final Pair<String, Integer> leaderHostPort = leaderLocator.getControllerLeader();
+ final Pair<String, Integer> leaderHostPort = leaderLocator.getControllerLeader(_rawTableName);
if (leaderHostPort == null) {
LOGGER.warn("No leader found while trying to send {}", request.toString());
return null;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 3d7bfce..8ff15b8 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -111,7 +111,7 @@ public class LLRealtimeSegmentDataManagerTest {
private RealtimeTableDataManager createTableDataManager() {
final String instanceId = "server-1";
- SegmentBuildTimeLeaseExtender.create(instanceId, new ServerMetrics(new MetricsRegistry()));
+ SegmentBuildTimeLeaseExtender.create(instanceId, new ServerMetrics(new MetricsRegistry()), _tableName);
RealtimeTableDataManager tableDataManager = mock(RealtimeTableDataManager.class);
when(tableDataManager.getServerInstance()).thenReturn(instanceId);
RealtimeSegmentStatsHistory statsHistory = mock(RealtimeSegmentStatsHistory.class);
diff --git a/pinot-core/src/test/java/org/apache/pinot/server/realtime/ControllerLeaderLocatorTest.java b/pinot-core/src/test/java/org/apache/pinot/server/realtime/ControllerLeaderLocatorTest.java
index 34e955d..5f0cdbc 100644
--- a/pinot-core/src/test/java/org/apache/pinot/server/realtime/ControllerLeaderLocatorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/server/realtime/ControllerLeaderLocatorTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.server.realtime;
import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
@@ -35,6 +36,7 @@ import static org.mockito.Mockito.when;
public class ControllerLeaderLocatorTest {
+ private String testTable = "testTable";
/**
* Tests the invalidate logic for cached controller leader
@@ -45,6 +47,7 @@ public class ControllerLeaderLocatorTest {
HelixManager helixManager = mock(HelixManager.class);
HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
BaseDataAccessor<ZNRecord> baseDataAccessor = mock(BaseDataAccessor.class);
+ HelixAdmin helixAdmin = mock(HelixAdmin.class);
ZNRecord znRecord = mock(ZNRecord.class);
final String leaderHost = "host";
final int leaderPort = 12345;
@@ -54,6 +57,8 @@ public class ControllerLeaderLocatorTest {
when(znRecord.getId()).thenReturn(leaderHost + "_" + leaderPort);
when(baseDataAccessor.get(anyString(), any(), anyInt())).thenReturn(znRecord);
when(helixManager.getClusterName()).thenReturn("testCluster");
+ when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
+ when(helixAdmin.getResourceExternalView(anyString(), anyString())).thenReturn(null);
// Create Controller Leader Locator
FakeControllerLeaderLocator.create(helixManager);
@@ -78,7 +83,7 @@ public class ControllerLeaderLocatorTest {
Assert.assertEquals(controllerLeaderLocator.getLastCacheInvalidateMillis(), lastCacheInvalidateMillis);
// getControllerLeader, which validates the cache
- controllerLeaderLocator.getControllerLeader();
+ controllerLeaderLocator.getControllerLeader(testTable);
Assert.assertFalse(controllerLeaderLocator.isCachedControllerLeaderInvalid());
Assert.assertEquals(controllerLeaderLocator.getLastCacheInvalidateMillis(), lastCacheInvalidateMillis);
@@ -104,16 +109,19 @@ public class ControllerLeaderLocatorTest {
HelixManager helixManager = mock(HelixManager.class);
HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
BaseDataAccessor<ZNRecord> baseDataAccessor = mock(BaseDataAccessor.class);
+ HelixAdmin helixAdmin = mock(HelixAdmin.class);
when(helixManager.getHelixDataAccessor()).thenReturn(helixDataAccessor);
when(helixDataAccessor.getBaseDataAccessor()).thenReturn(baseDataAccessor);
when(baseDataAccessor.get(anyString(), (Stat) any(), anyInt())).thenThrow(new RuntimeException());
+ when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
+ when(helixAdmin.getResourceExternalView(anyString(), anyString())).thenReturn(null);
// Create Controller Leader Locator
FakeControllerLeaderLocator.create(helixManager);
ControllerLeaderLocator controllerLeaderLocator = FakeControllerLeaderLocator.getInstance();
- Assert.assertEquals(controllerLeaderLocator.getControllerLeader(), null);
+ Assert.assertEquals(controllerLeaderLocator.getControllerLeader(testTable), null);
}
@Test
@@ -121,6 +129,7 @@ public class ControllerLeaderLocatorTest {
HelixManager helixManager = mock(HelixManager.class);
HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
BaseDataAccessor<ZNRecord> baseDataAccessor = mock(BaseDataAccessor.class);
+ HelixAdmin helixAdmin = mock(HelixAdmin.class);
ZNRecord znRecord = mock(ZNRecord.class);
final String leaderHost = "host";
final int leaderPort = 12345;
@@ -130,14 +139,16 @@ public class ControllerLeaderLocatorTest {
when(znRecord.getId()).thenReturn(leaderHost + "_" + leaderPort);
when(baseDataAccessor.get(anyString(), (Stat) any(), anyInt())).thenReturn(znRecord);
when(helixManager.getClusterName()).thenReturn("myCluster");
+ when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
+ when(helixAdmin.getResourceExternalView(anyString(), anyString())).thenReturn(null);
// Create Controller Leader Locator
FakeControllerLeaderLocator.create(helixManager);
ControllerLeaderLocator controllerLeaderLocator = FakeControllerLeaderLocator.getInstance();
Pair<String, Integer> expectedLeaderLocation = new Pair<>(leaderHost, leaderPort);
- Assert.assertEquals(controllerLeaderLocator.getControllerLeader().getFirst(), expectedLeaderLocation.getFirst());
- Assert.assertEquals(controllerLeaderLocator.getControllerLeader().getSecond(), expectedLeaderLocation.getSecond());
+ Assert.assertEquals(controllerLeaderLocator.getControllerLeader(testTable).getFirst(), expectedLeaderLocation.getFirst());
+ Assert.assertEquals(controllerLeaderLocator.getControllerLeader(testTable).getSecond(), expectedLeaderLocation.getSecond());
}
static class FakeControllerLeaderLocator extends ControllerLeaderLocator {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java
index 7e7b041..f6cd400 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java
@@ -134,7 +134,7 @@ public class SegmentCompletionIntegrationTests extends LLCRealtimeClusterIntegra
// Now report to the controller that we had to stop consumption
ServerSegmentCompletionProtocolHandler protocolHandler =
- new ServerSegmentCompletionProtocolHandler(new ServerMetrics(new MetricsRegistry()));
+ new ServerSegmentCompletionProtocolHandler(new ServerMetrics(new MetricsRegistry()), realtimeTableName);
SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
params.withOffset(45688L).withSegmentName(_currentSegment).withReason("RandomReason")
.withInstanceId(_serverInstance);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org