You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2019/11/06 06:35:49 UTC
[incubator-pinot] 01/01: Enhance RoutingTable from Map> to Map>
This is an automated email from the ASF dual-hosted git repository.
kishoreg pushed a commit to branch use_server_instance
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit eec13d444a3f00e24c3e81bfeef4fbba55019e49
Author: kishoreg <g....@gmail.com>
AuthorDate: Tue Nov 5 22:32:39 2019 -0800
Enhance RoutingTable from Map<String, List<String>> to Map<ServerInstance, List<String>>
---
.../requesthandler/BaseBrokerRequestHandler.java | 9 ++--
.../ConnectionPoolBrokerRequestHandler.java | 12 ++---
.../SingleConnectionBrokerRequestHandler.java | 4 +-
.../routing/HelixExternalViewBasedRouting.java | 7 +--
.../apache/pinot/broker/routing/RoutingTable.java | 3 +-
.../builder/BalancedRandomRoutingTableBuilder.java | 11 +++--
.../BasePartitionAwareRoutingTableBuilder.java | 23 ++++-----
.../routing/builder/BaseRoutingTableBuilder.java | 43 +++++++++--------
.../builder/DefaultOfflineRoutingTableBuilder.java | 5 +-
.../DefaultRealtimeRoutingTableBuilder.java | 5 +-
.../builder/GeneratorBasedRoutingTableBuilder.java | 56 +++++++++++-----------
.../HighLevelConsumerBasedRoutingTableBuilder.java | 29 +++++------
.../LowLevelConsumerRoutingTableBuilder.java | 11 +++--
.../PartitionAwareOfflineRoutingTableBuilder.java | 9 ++--
.../PartitionAwareRealtimeRoutingTableBuilder.java | 11 +++--
.../routing/builder/RoutingTableBuilder.java | 5 +-
.../broker/broker/HelixBrokerStarterTest.java | 3 +-
.../broker/routing/RandomRoutingTableTest.java | 3 +-
.../pinot/broker/routing/RoutingTableTest.java | 5 +-
.../BalancedRandomRoutingTableBuilderTest.java | 11 +++--
.../HighLevelConsumerRoutingTableBuilderTest.java | 3 +-
.../LargeClusterRoutingTableBuilderTest.java | 36 +++++++-------
.../LowLevelConsumerRoutingTableBuilderTest.java | 17 +++----
...rtitionAwareOfflineRoutingTableBuilderTest.java | 11 +++--
...titionAwareRealtimeRoutingTableBuilderTest.java | 9 ++--
.../pinot/common/response/ServerInstance.java | 5 ++
.../apache/pinot/core/transport/QueryRouter.java | 15 +++---
.../org/apache/pinot/core/transport/Server.java | 6 +++
.../pinot/core/transport/QueryRouterTest.java | 5 +-
.../transport/config/PerTableRoutingConfig.java | 7 +--
.../transport/scattergather/ScatterGatherImpl.java | 6 +--
.../scattergather/ScatterGatherRequest.java | 3 +-
.../transport/perf/ScatterGatherPerfClient.java | 4 +-
.../transport/scattergather/ScatterGatherTest.java | 18 +++----
34 files changed, 226 insertions(+), 184 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 834148d..ac3382e 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -53,6 +53,7 @@ import org.apache.pinot.common.request.FilterOperator;
import org.apache.pinot.common.request.FilterQuery;
import org.apache.pinot.common.request.FilterQueryMap;
import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.ServerInstance;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.CommonConstants.Broker;
@@ -261,8 +262,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
// Calculate routing table for the query
long routingStartTimeNs = System.nanoTime();
- Map<String, List<String>> offlineRoutingTable = null;
- Map<String, List<String>> realtimeRoutingTable = null;
+ Map<ServerInstance, List<String>> offlineRoutingTable = null;
+ Map<ServerInstance, List<String>> realtimeRoutingTable = null;
if (offlineBrokerRequest != null) {
offlineRoutingTable = _routingTable.getRoutingTable(new RoutingTableLookupRequest(offlineBrokerRequest));
if (offlineRoutingTable.isEmpty()) {
@@ -577,8 +578,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
* Processes the optimized broker requests for both OFFLINE and REALTIME table.
*/
protected abstract BrokerResponse processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
- @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<String, List<String>> offlineRoutingTable,
- @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<String, List<String>> realtimeRoutingTable,
+ @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<ServerInstance, List<String>> offlineRoutingTable,
+ @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable,
long timeoutMs, ServerStats serverStats, RequestStatistics requestStatistics)
throws Exception;
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
index 60a4312..aeaa44e 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
@@ -131,8 +131,8 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler
@Override
protected BrokerResponse processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
- @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<String, List<String>> offlineRoutingTable,
- @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<String, List<String>> realtimeRoutingTable,
+ @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<ServerInstance, List<String>> offlineRoutingTable,
+ @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable,
long timeoutMs, ServerStats serverStats, RequestStatistics requestStatistics)
throws Exception {
ScatterGatherStats scatterGatherStats = new ScatterGatherStats();
@@ -238,7 +238,7 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler
* @return composite future used to gather responses.
*/
private CompositeFuture<byte[]> scatterBrokerRequest(long requestId, BrokerRequest brokerRequest,
- Map<String, List<String>> routingTable, boolean isOfflineTable, long timeoutMs,
+ Map<ServerInstance, List<String>> routingTable, boolean isOfflineTable, long timeoutMs,
ScatterGatherStats scatterGatherStats, PhaseTimes phaseTimes)
throws InterruptedException {
long scatterStartTimeNs = System.nanoTime();
@@ -355,12 +355,12 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler
private static class ScatterGatherRequestImpl implements ScatterGatherRequest {
private final BrokerRequest _brokerRequest;
- private final Map<String, List<String>> _routingTable;
+ private final Map<ServerInstance, List<String>> _routingTable;
private final long _requestId;
private final long _requestTimeoutMs;
private final String _brokerId;
- public ScatterGatherRequestImpl(BrokerRequest request, Map<String, List<String>> routingTable, long requestId,
+ public ScatterGatherRequestImpl(BrokerRequest request, Map<ServerInstance, List<String>> routingTable, long requestId,
long requestTimeoutMs, String brokerId) {
_brokerRequest = request;
_routingTable = routingTable;
@@ -370,7 +370,7 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler
}
@Override
- public Map<String, List<String>> getRoutingTable() {
+ public Map<ServerInstance, List<String>> getRoutingTable() {
return _routingTable;
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index d7a4890..0674af2 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -71,8 +71,8 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl
@Override
protected BrokerResponse processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
- @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<String, List<String>> offlineRoutingTable,
- @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<String, List<String>> realtimeRoutingTable,
+ @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<ServerInstance, List<String>> offlineRoutingTable,
+ @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable,
long timeoutMs, ServerStats serverStats, RequestStatistics requestStatistics)
throws Exception {
assert offlineBrokerRequest != null || realtimeBrokerRequest != null;
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedRouting.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedRouting.java
index 63ab545..857f44c 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedRouting.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedRouting.java
@@ -50,6 +50,7 @@ import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.BrokerTimer;
+import org.apache.pinot.common.response.ServerInstance;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.EqualityUtils;
import org.apache.pinot.common.utils.JsonUtils;
@@ -112,7 +113,7 @@ public class HelixExternalViewBasedRouting implements ClusterChangeHandler, Rout
}
@Override
- public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request) {
+ public Map<ServerInstance, List<String>> getRoutingTable(RoutingTableLookupRequest request) {
String tableName = request.getTableName();
RoutingTableBuilder routingTableBuilder = _routingTableBuilderMap.get(tableName);
return routingTableBuilder.getRoutingTable(request, _segmentSelectorMap.get(tableName));
@@ -609,8 +610,8 @@ public class HelixExternalViewBasedRouting implements ClusterChangeHandler, Rout
ArrayNode entries = JsonUtils.newArrayNode();
RoutingTableBuilder routingTableBuilder = _routingTableBuilderMap.get(currentTable);
- List<Map<String, List<String>>> routingTables = routingTableBuilder.getRoutingTables();
- for (Map<String, List<String>> routingTable : routingTables) {
+ List<Map<ServerInstance, List<String>>> routingTables = routingTableBuilder.getRoutingTables();
+ for (Map<ServerInstance, List<String>> routingTable : routingTables) {
entries.add(JsonUtils.objectToJsonNode(routingTable));
}
tableEntry.set("routingTableEntries", entries);
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingTable.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingTable.java
index 5cad348..18717ff 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingTable.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingTable.java
@@ -20,6 +20,7 @@ package org.apache.pinot.broker.routing;
import java.util.List;
import java.util.Map;
+import org.apache.pinot.common.response.ServerInstance;
/**
@@ -33,7 +34,7 @@ public interface RoutingTable {
* @param request Routing table lookup request
* @return Map from server to list of segments
*/
- Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request);
+ Map<ServerInstance, List<String>> getRoutingTable(RoutingTableLookupRequest request);
/**
* Return whether the routing table for the given table exists.
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilder.java
index 3fb1e2d..761be58 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilder.java
@@ -28,6 +28,7 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.ServerInstance;
/**
@@ -46,15 +47,15 @@ public class BalancedRandomRoutingTableBuilder extends BaseRoutingTableBuilder {
_numRoutingTables = configuration.getInt(NUM_ROUTING_TABLES_KEY, DEFAULT_NUM_ROUTING_TABLES);
}
- protected List<Map<String, List<String>>> computeRoutingTablesFromSegmentToServersMap(
- Map<String, List<String>> segmentToServersMap) {
- List<Map<String, List<String>>> routingTables = new ArrayList<>(_numRoutingTables);
+ protected List<Map<ServerInstance, List<String>>> computeRoutingTablesFromSegmentToServersMap(
+ Map<String, List<ServerInstance>> segmentToServersMap) {
+ List<Map<ServerInstance, List<String>>> routingTables = new ArrayList<>(_numRoutingTables);
Set<String> segmentsToQuery = segmentToServersMap.keySet();
for (int i = 0; i < _numRoutingTables; i++) {
- Map<String, List<String>> routingTable = new HashMap<>();
+ Map<ServerInstance, List<String>> routingTable = new HashMap<>();
for (String segmentName : segmentsToQuery) {
- List<String> servers = segmentToServersMap.get(segmentName);
+ List<ServerInstance> servers = segmentToServersMap.get(segmentName);
routingTable.get(getServerWithLeastSegmentsAssigned(servers, routingTable)).add(segmentName);
}
routingTables.add(routingTable);
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java
index 7b8b182..4d4f465 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java
@@ -37,6 +37,7 @@ import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.ServerInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,7 +63,7 @@ public abstract class BasePartitionAwareRoutingTableBuilder implements RoutingTa
// Map from segment name to map from replica id to server
// Set variable as volatile so all threads can get the up-to-date map
- protected volatile Map<String, Map<Integer, String>> _segmentToReplicaToServerMap;
+ protected volatile Map<String, Map<Integer, ServerInstance>> _segmentToReplicaToServerMap;
// Cache for segment zk metadata to reduce the lookup to ZK store
protected Map<String, SegmentZKMetadata> _segmentToZkMetadataMapping = new ConcurrentHashMap<>();
@@ -87,9 +88,9 @@ public abstract class BasePartitionAwareRoutingTableBuilder implements RoutingTa
}
@Override
- public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) {
+ public Map<ServerInstance, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) {
// Copy the reference for the current segment to replica to server mapping for snapshot
- Map<String, Map<Integer, String>> segmentToReplicaToServerMap = _segmentToReplicaToServerMap;
+ Map<String, Map<Integer, ServerInstance>> segmentToReplicaToServerMap = _segmentToReplicaToServerMap;
// Get all available segments for table
Set<String> segmentsToQuery = segmentToReplicaToServerMap.keySet();
@@ -99,7 +100,7 @@ public abstract class BasePartitionAwareRoutingTableBuilder implements RoutingTa
segmentsToQuery = segmentSelector.selectSegments(request, segmentsToQuery);
}
- Map<String, List<String>> routingTable = new HashMap<>();
+ Map<ServerInstance, List<String>> routingTable = new HashMap<>();
SegmentPrunerContext prunerContext = new SegmentPrunerContext(request.getBrokerRequest());
// Shuffle the replica group ids in order to satisfy:
@@ -119,19 +120,19 @@ public abstract class BasePartitionAwareRoutingTableBuilder implements RoutingTa
if (!segmentPruned) {
// 2b. Segment cannot be pruned. Assign the segment to a server based on the shuffled replica group ids
- Map<Integer, String> replicaIdToServerMap = segmentToReplicaToServerMap.get(segmentName);
+ Map<Integer, ServerInstance> replicaIdToServerMap = segmentToReplicaToServerMap.get(segmentName);
- String serverName = null;
+ ServerInstance serverInstance = null;
for (int i = 0; i < _numReplicas; i++) {
- serverName = replicaIdToServerMap.get(shuffledReplicaGroupIds[i]);
+ serverInstance = replicaIdToServerMap.get(shuffledReplicaGroupIds[i]);
// If a server is found, update routing table for the current segment
- if (serverName != null) {
+ if (serverInstance != null) {
break;
}
}
- if (serverName != null) {
- routingTable.computeIfAbsent(serverName, k -> new ArrayList<>()).add(segmentName);
+ if (serverInstance != null) {
+ routingTable.computeIfAbsent(serverInstance, k -> new ArrayList<>()).add(segmentName);
} else {
// No server is found for this segment if the code reach here
@@ -145,7 +146,7 @@ public abstract class BasePartitionAwareRoutingTableBuilder implements RoutingTa
}
@Override
- public List<Map<String, List<String>>> getRoutingTables() {
+ public List<Map<ServerInstance, List<String>>> getRoutingTables() {
throw new UnsupportedOperationException("Partition aware routing table cannot be pre-computed");
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BaseRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BaseRoutingTableBuilder.java
index 7c63d43..7418f64 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BaseRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BaseRoutingTableBuilder.java
@@ -36,6 +36,7 @@ import org.apache.pinot.common.config.RoutingConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.ServerInstance;
import org.apache.pinot.common.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,10 +55,10 @@ public abstract class BaseRoutingTableBuilder implements RoutingTableBuilder {
// Set variable as volatile so all threads can get the up-to-date routing tables
// Routing tables are used for storing pre-computed routing table
- protected volatile List<Map<String, List<String>>> _routingTables;
+ protected volatile List<Map<ServerInstance, List<String>>> _routingTables;
// A mapping of segments to servers is used for dynamic routing table building process
- protected volatile Map<String, List<String>> _segmentToServersMap;
+ protected volatile Map<String, List<ServerInstance>> _segmentToServersMap;
@Override
public void init(Configuration configuration, TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> propertyStore,
@@ -76,13 +77,13 @@ public abstract class BaseRoutingTableBuilder implements RoutingTableBuilder {
}
}
- protected static String getServerWithLeastSegmentsAssigned(List<String> servers,
- Map<String, List<String>> routingTable) {
+ protected static ServerInstance getServerWithLeastSegmentsAssigned(List<ServerInstance> servers,
+ Map<ServerInstance, List<String>> routingTable) {
Collections.shuffle(servers);
- String selectedServer = null;
+ ServerInstance selectedServer = null;
int minNumSegmentsAssigned = Integer.MAX_VALUE;
- for (String server : servers) {
+ for (ServerInstance server : servers) {
List<String> segments = routingTable.get(server);
if (segments == null) {
routingTable.put(server, new ArrayList<>());
@@ -114,7 +115,7 @@ public abstract class BaseRoutingTableBuilder implements RoutingTableBuilder {
@Override
public void computeOnExternalViewChange(String tableName, ExternalView externalView,
List<InstanceConfig> instanceConfigs) {
- Map<String, List<String>> segmentToServersMap =
+ Map<String, List<ServerInstance>> segmentToServersMap =
computeSegmentToServersMapFromExternalView(externalView, instanceConfigs);
if (_enableDynamicComputing) {
@@ -122,15 +123,15 @@ public abstract class BaseRoutingTableBuilder implements RoutingTableBuilder {
_segmentToServersMap = segmentToServersMap;
} else {
// Otherwise, we cache the pre-computed routing tables
- List<Map<String, List<String>>> routingTables = computeRoutingTablesFromSegmentToServersMap(segmentToServersMap);
+ List<Map<ServerInstance, List<String>>> routingTables = computeRoutingTablesFromSegmentToServersMap(segmentToServersMap);
_routingTables = routingTables;
}
}
- public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) {
+ public Map<ServerInstance, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) {
if (_enableDynamicComputing) {
// Copy the pointer for snapshot since the pointer for segment to servers map can change at anytime
- Map<String, List<String>> segmentToServersMap = _segmentToServersMap;
+ Map<String, List<ServerInstance>> segmentToServersMap = _segmentToServersMap;
// Selecting segments only required for processing a query
Set<String> segmentsToQuery = segmentToServersMap.keySet();
@@ -147,7 +148,7 @@ public abstract class BaseRoutingTableBuilder implements RoutingTableBuilder {
}
@Override
- public List<Map<String, List<String>>> getRoutingTables() {
+ public List<Map<ServerInstance, List<String>>> getRoutingTables() {
return _routingTables;
}
@@ -158,12 +159,12 @@ public abstract class BaseRoutingTableBuilder implements RoutingTableBuilder {
* @param segmentsToQuery a list of segments that need to be processed for a particular query
* @return a routing table
*/
- public Map<String, List<String>> computeDynamicRoutingTable(Map<String, List<String>> segmentToServersMap,
+ public Map<ServerInstance, List<String>> computeDynamicRoutingTable(Map<String, List<ServerInstance>> segmentToServersMap,
Set<String> segmentsToQuery) {
- Map<String, List<String>> routingTable = new HashMap<>();
+ Map<ServerInstance, List<String>> routingTable = new HashMap<>();
for (String segmentName : segmentsToQuery) {
- List<String> servers = segmentToServersMap.get(segmentName);
- String selectedServer = servers.get(_random.nextInt(servers.size()));
+ List<ServerInstance> servers = segmentToServersMap.get(segmentName);
+ ServerInstance selectedServer = servers.get(_random.nextInt(servers.size()));
List<String> segments = routingTable.computeIfAbsent(selectedServer, k -> new ArrayList<>());
segments.add(segmentName);
}
@@ -178,18 +179,18 @@ public abstract class BaseRoutingTableBuilder implements RoutingTableBuilder {
* @param instanceConfigs a list of instance config
* @return a mapping of segment to servers
*/
- protected Map<String, List<String>> computeSegmentToServersMapFromExternalView(ExternalView externalView,
+ protected Map<String, List<ServerInstance>> computeSegmentToServersMapFromExternalView(ExternalView externalView,
List<InstanceConfig> instanceConfigs) {
- Map<String, List<String>> segmentToServersMap = new HashMap<>();
+ Map<String, List<ServerInstance>> segmentToServersMap = new HashMap<>();
RoutingTableInstancePruner instancePruner = new RoutingTableInstancePruner(instanceConfigs);
for (String segmentName : externalView.getPartitionSet()) {
// List of servers that are active and are serving the segment
- List<String> servers = new ArrayList<>();
+ List<ServerInstance> servers = new ArrayList<>();
for (Map.Entry<String, String> entry : externalView.getStateMap(segmentName).entrySet()) {
String serverName = entry.getKey();
if (entry.getValue().equals(CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.ONLINE)
&& !instancePruner.isInactive(serverName)) {
- servers.add(serverName);
+ servers.add(ServerInstance.forInstanceName(serverName));
}
}
if (!servers.isEmpty()) {
@@ -208,6 +209,6 @@ public abstract class BaseRoutingTableBuilder implements RoutingTableBuilder {
* @param segmentToServersMap a mapping of segment to servers
* @return a list of final routing tables
*/
- protected abstract List<Map<String, List<String>>> computeRoutingTablesFromSegmentToServersMap(
- Map<String, List<String>> segmentToServersMap);
+ protected abstract List<Map<ServerInstance, List<String>>> computeRoutingTablesFromSegmentToServersMap(
+ Map<String, List<ServerInstance>> segmentToServersMap);
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/DefaultOfflineRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/DefaultOfflineRoutingTableBuilder.java
index 8bdb9df..7be412b 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/DefaultOfflineRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/DefaultOfflineRoutingTableBuilder.java
@@ -31,6 +31,7 @@ import org.apache.pinot.broker.routing.RoutingTableLookupRequest;
import org.apache.pinot.broker.routing.selector.SegmentSelector;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.ServerInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -126,12 +127,12 @@ public class DefaultOfflineRoutingTableBuilder implements RoutingTableBuilder {
}
@Override
- public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) {
+ public Map<ServerInstance, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) {
return _routingTableBuilder.getRoutingTable(request, segmentSelector);
}
@Override
- public List<Map<String, List<String>>> getRoutingTables() {
+ public List<Map<ServerInstance, List<String>>> getRoutingTables() {
return _routingTableBuilder.getRoutingTables();
}
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/DefaultRealtimeRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/DefaultRealtimeRoutingTableBuilder.java
index 49ffc18..2561cf1 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/DefaultRealtimeRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/DefaultRealtimeRoutingTableBuilder.java
@@ -31,6 +31,7 @@ import org.apache.pinot.broker.routing.RoutingTableLookupRequest;
import org.apache.pinot.broker.routing.selector.SegmentSelector;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.ServerInstance;
import org.apache.pinot.common.utils.SegmentName;
@@ -73,7 +74,7 @@ public class DefaultRealtimeRoutingTableBuilder implements RoutingTableBuilder {
}
@Override
- public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) {
+ public Map<ServerInstance, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) {
boolean forceLLC = false;
boolean forceHLC = false;
for (String routingOption : request.getRoutingOptions()) {
@@ -105,7 +106,7 @@ public class DefaultRealtimeRoutingTableBuilder implements RoutingTableBuilder {
}
@Override
- public List<Map<String, List<String>>> getRoutingTables() {
+ public List<Map<ServerInstance, List<String>>> getRoutingTables() {
if (_hasLLC) {
return _realtimeLLCRoutingTableBuilder.getRoutingTables();
} else if (_hasHLC) {
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/GeneratorBasedRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/GeneratorBasedRoutingTableBuilder.java
index 592bd6e..1dcf8f9 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/GeneratorBasedRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/GeneratorBasedRoutingTableBuilder.java
@@ -28,6 +28,7 @@ import java.util.PriorityQueue;
import java.util.Set;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.response.ServerInstance;
/**
@@ -47,10 +48,11 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable
* Generates a routing table, decorated with a metric.
*
* @return A pair of a routing table and its associated metric.
+ * @param segmentToServersMap
*/
- private Pair<Map<String, List<String>>, Float> generateRoutingTableWithMetric(
- Map<String, List<String>> segmentToServersMap) {
- Map<String, List<String>> routingTable = generateRoutingTable(segmentToServersMap);
+ private Pair<Map<ServerInstance, List<String>>, Float> generateRoutingTableWithMetric(
+ Map<String, List<ServerInstance>> segmentToServersMap) {
+ Map<ServerInstance, List<String>> routingTable = generateRoutingTable(segmentToServersMap);
int segmentCount = 0;
int serverCount = 0;
@@ -73,26 +75,26 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable
return new ImmutablePair<>(routingTable, variance);
}
- private Map<String, List<String>> generateRoutingTable(Map<String, List<String>> segmentToServersMap) {
+ private Map<ServerInstance, List<String>> generateRoutingTable(Map<String, List<ServerInstance>> segmentToServersMap) {
- Map<String, List<String>> routingTable = new HashMap<>();
+ Map<ServerInstance, List<String>> routingTable = new HashMap<>();
if (segmentToServersMap.isEmpty()) {
return routingTable;
}
// Construct the map from server to list of segments
- Map<String, List<String>> serverToSegmentsMap = new HashMap<>();
- for (Map.Entry<String, List<String>> entry : segmentToServersMap.entrySet()) {
- List<String> servers = entry.getValue();
- for (String serverName : servers) {
- List<String> segmentsForServer = serverToSegmentsMap.computeIfAbsent(serverName, k -> new ArrayList<>());
+ Map<ServerInstance, List<String>> serverToSegmentsMap = new HashMap<>();
+ for (Map.Entry<String, List<ServerInstance>> entry : segmentToServersMap.entrySet()) {
+ List<ServerInstance> servers = entry.getValue();
+ for (ServerInstance serverInstance : servers) {
+ List<String> segmentsForServer = serverToSegmentsMap.computeIfAbsent(serverInstance, k -> new ArrayList<>());
segmentsForServer.add(entry.getKey());
}
}
int numSegments = segmentToServersMap.size();
- List<String> servers = new ArrayList<>(serverToSegmentsMap.keySet());
+ List<ServerInstance> servers = new ArrayList<>(serverToSegmentsMap.keySet());
int numServers = servers.size();
// Set of segments that have no instance serving them
@@ -100,7 +102,7 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable
// Set of servers in this routing table
int targetNumServersPerQuery = getTargetNumServersPerQuery();
- Set<String> serversInRoutingTable = new HashSet<>(targetNumServersPerQuery);
+ Set<ServerInstance> serversInRoutingTable = new HashSet<>(targetNumServersPerQuery);
if (numServers <= targetNumServersPerQuery) {
// If there are not enough instances, add them all
@@ -109,7 +111,7 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable
} else {
// Otherwise add _targetNumServersPerQuery instances
while (serversInRoutingTable.size() < targetNumServersPerQuery) {
- String randomServer = servers.get(_random.nextInt(numServers));
+ ServerInstance randomServer = servers.get(_random.nextInt(numServers));
if (!serversInRoutingTable.contains(randomServer)) {
serversInRoutingTable.add(randomServer);
segmentsNotHandledByServers.removeAll(serverToSegmentsMap.get(randomServer));
@@ -122,32 +124,32 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable
String segmentNotHandledByServers = segmentsNotHandledByServers.iterator().next();
// Pick a random server that can serve this segment
- List<String> serversForSegment = segmentToServersMap.get(segmentNotHandledByServers);
- String randomServer = serversForSegment.get(_random.nextInt(serversForSegment.size()));
+ List<ServerInstance> serversForSegment = segmentToServersMap.get(segmentNotHandledByServers);
+ ServerInstance randomServer = serversForSegment.get(_random.nextInt(serversForSegment.size()));
serversInRoutingTable.add(randomServer);
segmentsNotHandledByServers.removeAll(serverToSegmentsMap.get(randomServer));
}
// Sort all the segments to be used during assignment in ascending order of replicas
- PriorityQueue<Pair<String, List<String>>> segmentToReplicaSetQueue =
+ PriorityQueue<Pair<String, List<ServerInstance>>> segmentToReplicaSetQueue =
new PriorityQueue<>(numSegments, Comparator.comparingInt(pair -> pair.getRight().size()));
- for (Map.Entry<String, List<String>> entry : segmentToServersMap.entrySet()) {
+ for (Map.Entry<String, List<ServerInstance>> entry : segmentToServersMap.entrySet()) {
// Servers for the segment is the intersection of all servers for this segment and the servers that we have in
// this routing table
- List<String> serversForSegment = new ArrayList<>(entry.getValue());
+ List<ServerInstance> serversForSegment = new ArrayList<>(entry.getValue());
serversForSegment.retainAll(serversInRoutingTable);
segmentToReplicaSetQueue.add(new ImmutablePair<>(entry.getKey(), serversForSegment));
}
// Assign each segment to a server
- Pair<String, List<String>> segmentServersPair;
+ Pair<String, List<ServerInstance>> segmentServersPair;
while ((segmentServersPair = segmentToReplicaSetQueue.poll()) != null) {
String segmentName = segmentServersPair.getLeft();
- List<String> serversForSegment = segmentServersPair.getRight();
+ List<ServerInstance> serversForSegment = segmentServersPair.getRight();
- String serverWithLeastSegmentsAssigned = getServerWithLeastSegmentsAssigned(serversForSegment, routingTable);
+ ServerInstance serverWithLeastSegmentsAssigned = getServerWithLeastSegmentsAssigned(serversForSegment, routingTable);
List<String> segmentsAssignedToServer =
routingTable.computeIfAbsent(serverWithLeastSegmentsAssigned, k -> new ArrayList<>());
segmentsAssignedToServer.add(segmentName);
@@ -220,8 +222,8 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable
*/
@Override
- protected List<Map<String, List<String>>> computeRoutingTablesFromSegmentToServersMap(
- Map<String, List<String>> segmentToServersMap) {
+ protected List<Map<ServerInstance, List<String>>> computeRoutingTablesFromSegmentToServersMap(
+ Map<String, List<ServerInstance>> segmentToServersMap) {
// The default routing table algorithm tries to balance all available segments across all servers, so that each
// server is hit on every query. This works fine with small clusters (say less than 20 servers) but for larger
// clusters, this adds up to significant overhead (one request must be enqueued for each server, processed,
@@ -260,7 +262,7 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable
// in workload per server across all the routing tables. To do so, we generate an initial set of routing tables
// according to a per-routing table metric and discard the worst routing tables.
- PriorityQueue<Pair<Map<String, List<String>>, Float>> topRoutingTables =
+ PriorityQueue<Pair<Map<ServerInstance, List<String>>, Float>> topRoutingTables =
new PriorityQueue<>(ROUTING_TABLE_COUNT, (left, right) -> {
// Float.compare sorts in ascending order and we want a max heap, so we need to return the negative
// of the comparison
@@ -273,8 +275,8 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable
// Generate routing more tables and keep the ROUTING_TABLE_COUNT top ones
for (int i = 0; i < (ROUTING_TABLE_GENERATION_COUNT - ROUTING_TABLE_COUNT); ++i) {
- Pair<Map<String, List<String>>, Float> newRoutingTable = generateRoutingTableWithMetric(segmentToServersMap);
- Pair<Map<String, List<String>>, Float> worstRoutingTable = topRoutingTables.peek();
+ Pair<Map<ServerInstance, List<String>>, Float> newRoutingTable = generateRoutingTableWithMetric(segmentToServersMap);
+ Pair<Map<ServerInstance, List<String>>, Float> worstRoutingTable = topRoutingTables.peek();
// If the new routing table is better than the worst one, keep it
if (newRoutingTable.getRight() < worstRoutingTable.getRight()) {
@@ -284,7 +286,7 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable
}
// Return the best routing tables
- List<Map<String, List<String>>> routingTables = new ArrayList<>(topRoutingTables.size());
+ List<Map<ServerInstance, List<String>>> routingTables = new ArrayList<>(topRoutingTables.size());
while (!topRoutingTables.isEmpty()) {
routingTables.add(topRoutingTables.poll().getKey());
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/HighLevelConsumerBasedRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/HighLevelConsumerBasedRoutingTableBuilder.java
index e1368e6..1aa3908 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/HighLevelConsumerBasedRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/HighLevelConsumerBasedRoutingTableBuilder.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.response.ServerInstance;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.HLCSegmentName;
import org.apache.pinot.common.utils.SegmentName;
@@ -33,17 +34,17 @@ import org.apache.pinot.common.utils.SegmentName;
public class HighLevelConsumerBasedRoutingTableBuilder extends BaseRoutingTableBuilder {
@Override
- protected Map<String, List<String>> computeSegmentToServersMapFromExternalView(ExternalView externalView,
+ protected Map<String, List<ServerInstance>> computeSegmentToServersMapFromExternalView(ExternalView externalView,
List<InstanceConfig> instanceConfigs) {
- Map<String, List<String>> segmentToServersMap = new HashMap<>();
+ Map<String, List<ServerInstance>> segmentToServersMap = new HashMap<>();
RoutingTableInstancePruner instancePruner = new RoutingTableInstancePruner(instanceConfigs);
for (String segmentName : externalView.getPartitionSet()) {
- List<String> servers = new ArrayList<>();
+ List<ServerInstance> servers = new ArrayList<>();
for (Map.Entry<String, String> entry : externalView.getStateMap(segmentName).entrySet()) {
String serverName = entry.getKey();
if (entry.getValue().equals(CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.ONLINE)
&& !instancePruner.isInactive(serverName) && SegmentName.isHighLevelConsumerSegmentName(segmentName)) {
- servers.add(serverName);
+ servers.add(ServerInstance.forInstanceName(serverName));
}
}
if (servers.size() != 0) {
@@ -56,20 +57,20 @@ public class HighLevelConsumerBasedRoutingTableBuilder extends BaseRoutingTableB
}
@Override
- protected List<Map<String, List<String>>> computeRoutingTablesFromSegmentToServersMap(
- Map<String, List<String>> segmentsToServerMap) {
- List<Map<String, List<String>>> routingTables = new ArrayList<>();
- Map<String, Map<String, List<String>>> groupIdToRouting = new HashMap<>();
- for (Map.Entry<String, List<String>> entry : segmentsToServerMap.entrySet()) {
+ protected List<Map<ServerInstance, List<String>>> computeRoutingTablesFromSegmentToServersMap(
+ Map<String, List<ServerInstance>> segmentsToServerMap) {
+ List<Map<ServerInstance, List<String>>> routingTables = new ArrayList<>();
+ Map<String, Map<ServerInstance, List<String>>> groupIdToRouting = new HashMap<>();
+ for (Map.Entry<String, List<ServerInstance>> entry : segmentsToServerMap.entrySet()) {
String segmentName = entry.getKey();
HLCSegmentName hlcSegmentName = new HLCSegmentName(segmentName);
String groupId = hlcSegmentName.getGroupId();
- Map<String, List<String>> routingTableForGroupId =
+ Map<ServerInstance, List<String>> routingTableForGroupId =
groupIdToRouting.computeIfAbsent(groupId, k -> new HashMap<>());
- List<String> servers = entry.getValue();
- for (String serverName : servers) {
- List<String> segmentsForServer = routingTableForGroupId.computeIfAbsent(serverName, k -> new ArrayList<>());
+ List<ServerInstance> servers = entry.getValue();
+ for (ServerInstance serverInstance : servers) {
+ List<String> segmentsForServer = routingTableForGroupId.computeIfAbsent(serverInstance, k -> new ArrayList<>());
segmentsForServer.add(segmentName);
}
}
@@ -78,7 +79,7 @@ public class HighLevelConsumerBasedRoutingTableBuilder extends BaseRoutingTableB
}
@Override
- public Map<String, List<String>> computeDynamicRoutingTable(Map<String, List<String>> segmentToServersMap,
+ public Map<ServerInstance, List<String>> computeDynamicRoutingTable(Map<String, List<ServerInstance>> segmentToServersMap,
Set<String> segmentsToQuery) {
throw new UnsupportedOperationException(
"Dynamic routing table computation for high level consumer base routing is not supported");
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilder.java
index 4a39da2..4749e60 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilder.java
@@ -30,6 +30,7 @@ import org.apache.helix.model.InstanceConfig;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.ServerInstance;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCUtils;
import org.apache.pinot.common.utils.SegmentName;
@@ -66,7 +67,7 @@ public class LowLevelConsumerRoutingTableBuilder extends GeneratorBasedRoutingTa
}
@Override
- protected Map<String, List<String>> computeSegmentToServersMapFromExternalView(ExternalView externalView,
+ protected Map<String, List<ServerInstance>> computeSegmentToServersMapFromExternalView(ExternalView externalView,
List<InstanceConfig> instanceConfigs) {
// We build the segment to servers mapping here. What we want to do is to make sure that we uphold
// the guarantees clients expect (no duplicate records, eventual consistency) and spreading the load as equally as
@@ -80,7 +81,7 @@ public class LowLevelConsumerRoutingTableBuilder extends GeneratorBasedRoutingTa
// The upstream code in BaseRoutingTableGenerator will generate routing tables based on taking a subset of servers
// if the cluster is large enough as well as ensure that the best routing tables are used for routing.
- Map<String, List<String>> segmentToServersMap = new HashMap<>();
+ Map<String, List<ServerInstance>> segmentToServersMap = new HashMap<>();
// 1. Gather all segments and group them by partition, sorted by sequence number
Map<String, SortedSet<SegmentName>> sortedSegmentsByStreamPartition =
@@ -103,7 +104,7 @@ public class LowLevelConsumerRoutingTableBuilder extends GeneratorBasedRoutingTa
SegmentName validConsumingSegment = allowedSegmentInConsumingStateByPartition.get(partitionId);
for (SegmentName segmentName : segmentNames) {
- List<String> validServers = new ArrayList<>();
+ List<ServerInstance> validServers = new ArrayList<>();
String segmentNameStr = segmentName.getSegmentName();
Map<String, String> externalViewState = externalView.getStateMap(segmentNameStr);
@@ -118,14 +119,14 @@ public class LowLevelConsumerRoutingTableBuilder extends GeneratorBasedRoutingTa
// Replicas in ONLINE state are always allowed
if (state.equalsIgnoreCase(CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.ONLINE)) {
- validServers.add(instance);
+ validServers.add(ServerInstance.forInstanceName(instance));
continue;
}
// If the server is in CONSUMING status, the segment has to be match with the valid consuming segment
if (state.equals(CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING)
&& validConsumingSegment != null && segmentNameStr.equals(validConsumingSegment.getSegmentName())) {
- validServers.add(instance);
+ validServers.add(ServerInstance.forInstanceName(instance));
}
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilder.java
index d8df632..c105d97 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilder.java
@@ -38,6 +38,7 @@ import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.ServerInstance;
import org.apache.pinot.common.utils.CommonConstants;
@@ -135,14 +136,14 @@ public class PartitionAwareOfflineRoutingTableBuilder extends BasePartitionAware
}
// 3. Compute the final routing look up table
- Map<String, Map<Integer, String>> segmentToReplicaToServerMap = new HashMap<>();
+ Map<String, Map<Integer, ServerInstance>> segmentToReplicaToServerMap = new HashMap<>();
for (String segmentName : segmentSet) {
// Get partition_id from cached segment zk metadata
SegmentZKMetadata segmentZKMetadata = _segmentToZkMetadataMapping.get(segmentName);
int partitionId = getPartitionId(segmentZKMetadata);
// Initialize data intermediate data structures or data
- Map<Integer, String> replicaToServerMap = new HashMap<>();
+ Map<Integer, ServerInstance> replicaToServerMap = new HashMap<>();
int replicaIdForNoPartitionMetadata = 0;
for (Map.Entry<String, String> entry : externalView.getStateMap(segmentName).entrySet()) {
@@ -151,10 +152,10 @@ public class PartitionAwareOfflineRoutingTableBuilder extends BasePartitionAware
&& !instancePruner.isInactive(serverName)) {
// If there's no partition number in the metadata, assign replica id sequentially.
if (partitionId == NO_PARTITION_NUMBER) {
- replicaToServerMap.put(replicaIdForNoPartitionMetadata++, serverName);
+ replicaToServerMap.put(replicaIdForNoPartitionMetadata++, ServerInstance.forInstanceName(serverName));
} else {
int replicaId = partitionToServerToReplicaMap.get(partitionId).get(serverName);
- replicaToServerMap.put(replicaId, serverName);
+ replicaToServerMap.put(replicaId, ServerInstance.forInstanceName(serverName));
}
}
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilder.java
index f302dc46..c9dbbb6 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilder.java
@@ -34,6 +34,7 @@ import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.ServerInstance;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.LLCUtils;
@@ -84,12 +85,12 @@ public class PartitionAwareRealtimeRoutingTableBuilder extends BasePartitionAwar
RoutingTableInstancePruner instancePruner = new RoutingTableInstancePruner(instanceConfigs);
// Compute map from segment to map from replica to server
- Map<String, Map<Integer, String>> segmentToReplicaToServerMap = new HashMap<>();
+ Map<String, Map<Integer, ServerInstance>> segmentToReplicaToServerMap = new HashMap<>();
for (String segmentName : segmentSet) {
int partitionId = getPartitionId(segmentName);
SegmentName validConsumingSegment = allowedSegmentInConsumingStateByPartition.get(Integer.toString(partitionId));
- Map<Integer, String> replicaToServerMap = new HashMap<>();
+ Map<Integer, ServerInstance> replicaToServerMap = new HashMap<>();
int replicaId = 0;
for (Map.Entry<String, String> entry : externalView.getStateMap(segmentName).entrySet()) {
String serverName = entry.getKey();
@@ -102,13 +103,13 @@ public class PartitionAwareRealtimeRoutingTableBuilder extends BasePartitionAwar
// If the server is in ONLINE status, it's always to safe to add
if (state.equals(CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.ONLINE)) {
- replicaToServerMap.put(replicaId++, serverName);
+ replicaToServerMap.put(replicaId++, ServerInstance.forInstanceName(serverName));
}
// If the server is in CONSUMING status, the segment has to be match with the valid consuming segment
if (state.equals(CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING)
&& validConsumingSegment != null && segmentName.equals(validConsumingSegment.getSegmentName())) {
- replicaToServerMap.put(replicaId++, serverName);
+ replicaToServerMap.put(replicaId++, ServerInstance.forInstanceName(serverName));
}
}
@@ -129,7 +130,7 @@ public class PartitionAwareRealtimeRoutingTableBuilder extends BasePartitionAwar
// Get the unique set of replica ids and find the maximum id to update the number of replicas
Set<Integer> replicaGroupIds = new HashSet<>();
- for (Map<Integer, String> replicaToServer : segmentToReplicaToServerMap.values()) {
+ for (Map<Integer, ServerInstance> replicaToServer : segmentToReplicaToServerMap.values()) {
replicaGroupIds.addAll(replicaToServer.keySet());
}
int numReplicas = Collections.max(replicaGroupIds) + 1;
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/RoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/RoutingTableBuilder.java
index 7745d83..67c59db 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/RoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/RoutingTableBuilder.java
@@ -29,6 +29,7 @@ import org.apache.pinot.broker.routing.RoutingTableLookupRequest;
import org.apache.pinot.broker.routing.selector.SegmentSelector;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.ServerInstance;
/**
@@ -54,10 +55,10 @@ public interface RoutingTableBuilder {
* TODO: we need to consider relocating segment selector into the routing table builder instead of passing it
* from outside.
*/
- Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector);
+ Map<ServerInstance, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector);
/**
* Get all pre-computed routing tables.
*/
- List<Map<String, List<String>>> getRoutingTables();
+ List<Map<ServerInstance, List<String>>> getRoutingTables();
}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index ccc6377..e7c0d4d 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -37,6 +37,7 @@ import org.apache.pinot.common.config.TagNameUtils;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
+import org.apache.pinot.common.response.ServerInstance;
import org.apache.pinot.common.utils.CommonConstants.Broker;
import org.apache.pinot.common.utils.CommonConstants.Helix;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
@@ -138,7 +139,7 @@ public class HelixBrokerStarterTest extends ControllerTest {
assertTrue(routing.routingTableExists(REALTIME_TABLE_NAME));
RoutingTableLookupRequest routingTableLookupRequest = new RoutingTableLookupRequest(OFFLINE_TABLE_NAME);
- Map<String, List<String>> routingTable = routing.getRoutingTable(routingTableLookupRequest);
+ Map<ServerInstance, List<String>> routingTable = routing.getRoutingTable(routingTableLookupRequest);
assertEquals(routingTable.size(), NUM_SERVERS);
assertEquals(routingTable.values().iterator().next().size(), NUM_OFFLINE_SEGMENTS);
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RandomRoutingTableTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RandomRoutingTableTest.java
index 5b5cdd6..8ac1be0 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RandomRoutingTableTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RandomRoutingTableTest.java
@@ -35,6 +35,7 @@ import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableConfig.Builder;
import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.response.ServerInstance;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -65,7 +66,7 @@ public class RandomRoutingTableTest {
routing.markDataResourceOnline(generateTableConfig(tableName), externalView, instanceConfigs);
for (int i = 0; i < NUM_ROUNDS; i++) {
- Map<String, List<String>> routingTable = routing.getRoutingTable(new RoutingTableLookupRequest(tableName));
+ Map<ServerInstance, List<String>> routingTable = routing.getRoutingTable(new RoutingTableLookupRequest(tableName));
Assert.assertEquals(routingTable.size(), numServersInEV);
int numSegments = 0;
for (List<String> segmentsForServer : routingTable.values()) {
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RoutingTableTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RoutingTableTest.java
index e583799..8d5051c 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RoutingTableTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RoutingTableTest.java
@@ -39,6 +39,7 @@ import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.ServerInstance;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import org.apache.pinot.common.utils.HLCSegmentName;
import org.apache.pinot.common.utils.LLCSegmentName;
@@ -150,7 +151,7 @@ public class RoutingTableTest {
private void assertResourceRequest(HelixExternalViewBasedRouting routing, String resource, String expectedSegmentList,
int expectedNumSegment) {
- Map<String, List<String>> routingTable = routing.getRoutingTable(new RoutingTableLookupRequest(resource));
+ Map<ServerInstance, List<String>> routingTable = routing.getRoutingTable(new RoutingTableLookupRequest(resource));
List<String> selectedSegments = new ArrayList<>();
for (List<String> segmentsForServer : routingTable.values()) {
selectedSegments.addAll(segmentsForServer);
@@ -257,7 +258,7 @@ public class RoutingTableTest {
private void assertResourceRequest(HelixExternalViewBasedRouting routing, String resource,
String[] expectedSegmentLists, int expectedNumSegment) {
- Map<String, List<String>> routingTable = routing.getRoutingTable(new RoutingTableLookupRequest(resource));
+ Map<ServerInstance, List<String>> routingTable = routing.getRoutingTable(new RoutingTableLookupRequest(resource));
List<String> selectedSegments = new ArrayList<>();
for (List<String> segmentsForServer : routingTable.values()) {
selectedSegments.addAll(segmentsForServer);
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilderTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilderTest.java
index cc18fec..e9de507 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilderTest.java
@@ -29,6 +29,7 @@ import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.broker.routing.RoutingTableLookupRequest;
import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.response.ServerInstance;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -50,13 +51,13 @@ public class BalancedRandomRoutingTableBuilderTest {
// Build routing table
routingTableBuilder.computeOnExternalViewChange("dummy", externalView, instanceConfigList);
- List<Map<String, List<String>>> routingTables = routingTableBuilder.getRoutingTables();
+ List<Map<ServerInstance, List<String>>> routingTables = routingTableBuilder.getRoutingTables();
// Check that at least two routing tables are different
- Iterator<Map<String, List<String>>> routingTableIterator = routingTables.iterator();
- Map<String, List<String>> previous = routingTableIterator.next();
+ Iterator<Map<ServerInstance, List<String>>> routingTableIterator = routingTables.iterator();
+ Map<ServerInstance, List<String>> previous = routingTableIterator.next();
while (routingTableIterator.hasNext()) {
- Map<String, List<String>> current = routingTableIterator.next();
+ Map<ServerInstance, List<String>> current = routingTableIterator.next();
if (!current.equals(previous)) {
return;
}
@@ -83,7 +84,7 @@ public class BalancedRandomRoutingTableBuilderTest {
// Build routing table
routingTableBuilder.computeOnExternalViewChange("dummy", externalView, instanceConfigList);
RoutingTableLookupRequest request = new RoutingTableLookupRequest(tableNameWithType);
- Map<String, List<String>> routingTable = routingTableBuilder.getRoutingTable(request, null);
+ Map<ServerInstance, List<String>> routingTable = routingTableBuilder.getRoutingTable(request, null);
Set<String> segmentsInRoutingTable = new HashSet<>();
for (List<String> segments : routingTable.values()) {
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java
index 225f3f8..01676ab 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java
@@ -30,6 +30,7 @@ import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.broker.routing.RoutingTableLookupRequest;
import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.response.ServerInstance;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.HLCSegmentName;
import org.testng.Assert;
@@ -99,7 +100,7 @@ public class HighLevelConsumerRoutingTableBuilderTest {
// Check if the routing table result is correct
for (int run = 0; run < MAX_NUM_GROUPS * 10; run++) {
RoutingTableLookupRequest request = new RoutingTableLookupRequest(tableNameWithType);
- Map<String, List<String>> routingTable = routingTableBuilder.getRoutingTable(request, null);
+ Map<ServerInstance, List<String>> routingTable = routingTableBuilder.getRoutingTable(request, null);
Set<String> coveredSegments = new HashSet<>();
for (List<String> segmentsForServer : routingTable.values()) {
coveredSegments.addAll(segmentsForServer);
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/LargeClusterRoutingTableBuilderTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/LargeClusterRoutingTableBuilderTest.java
index f081211..e1be01c 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/LargeClusterRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/LargeClusterRoutingTableBuilderTest.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.response.ServerInstance;
import org.apache.pinot.common.utils.CommonConstants;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -41,7 +42,7 @@ public class LargeClusterRoutingTableBuilderTest {
private LargeClusterRoutingTableBuilder _largeClusterRoutingTableBuilder = new LargeClusterRoutingTableBuilder();
private interface RoutingTableValidator {
- boolean isRoutingTableValid(Map<String, List<String>> routingTable, ExternalView externalView,
+ boolean isRoutingTableValid(Map<ServerInstance, List<String>> routingTable, ExternalView externalView,
List<InstanceConfig> instanceConfigs);
}
@@ -49,7 +50,7 @@ public class LargeClusterRoutingTableBuilderTest {
public void testRoutingTableCoversAllSegmentsExactlyOnce() {
validateAssertionOverMultipleRoutingTables(new RoutingTableValidator() {
@Override
- public boolean isRoutingTableValid(Map<String, List<String>> routingTable, ExternalView externalView,
+ public boolean isRoutingTableValid(Map<ServerInstance, List<String>> routingTable, ExternalView externalView,
List<InstanceConfig> instanceConfigs) {
Set<String> unassignedSegments = new HashSet<>();
unassignedSegments.addAll(externalView.getPartitionSet());
@@ -94,12 +95,13 @@ public class LargeClusterRoutingTableBuilderTest {
validateAssertionForOneRoutingTable(new RoutingTableValidator() {
@Override
- public boolean isRoutingTableValid(Map<String, List<String>> routingTable, ExternalView externalView,
+ public boolean isRoutingTableValid(Map<ServerInstance, List<String>> routingTable, ExternalView externalView,
List<InstanceConfig> instanceConfigs) {
- for (String serverName : routingTable.keySet()) {
+ for (ServerInstance serverInstance : routingTable.keySet()) {
// These servers should not appear in the routing table
- if (serverName.equals(disabledHelixInstanceName) || serverName.equals(shuttingDownInstanceName) ||
- serverName.equals(queriesDisabledInstanceName)) {
+ String instanceName = serverInstance.getInstanceName();
+ if (instanceName.equals(disabledHelixInstanceName) || instanceName.equals(shuttingDownInstanceName) ||
+ instanceName.equals(queriesDisabledInstanceName)) {
return false;
}
}
@@ -122,12 +124,12 @@ public class LargeClusterRoutingTableBuilderTest {
_largeClusterRoutingTableBuilder.computeOnExternalViewChange(tableName, externalView, instanceConfigs);
- List<Map<String, List<String>>> routingTables = _largeClusterRoutingTableBuilder.getRoutingTables();
+ List<Map<ServerInstance, List<String>>> routingTables = _largeClusterRoutingTableBuilder.getRoutingTables();
int routingTableCount = 0;
int largerThanDesiredRoutingTableCount = 0;
- for (Map<String, List<String>> routingTable : routingTables) {
+ for (Map<ServerInstance, List<String>> routingTable : routingTables) {
routingTableCount++;
if (desiredServerCount < routingTable.size()) {
largerThanDesiredRoutingTableCount++;
@@ -154,22 +156,22 @@ public class LargeClusterRoutingTableBuilderTest {
_largeClusterRoutingTableBuilder.computeOnExternalViewChange(tableName, externalView, instanceConfigs);
- List<Map<String, List<String>>> routingTables = _largeClusterRoutingTableBuilder.getRoutingTables();
+ List<Map<ServerInstance, List<String>>> routingTables = _largeClusterRoutingTableBuilder.getRoutingTables();
- Map<String, Integer> segmentCountPerServer = new HashMap<>();
+ Map<ServerInstance, Integer> segmentCountPerServer = new HashMap<>();
// Count number of segments assigned per server
- for (Map<String, List<String>> routingTable : routingTables) {
- for (Map.Entry<String, List<String>> entry : routingTable.entrySet()) {
- String serverName = entry.getKey();
- Integer numSegmentsForServer = segmentCountPerServer.get(serverName);
+ for (Map<ServerInstance, List<String>> routingTable : routingTables) {
+ for (Map.Entry<ServerInstance, List<String>> entry : routingTable.entrySet()) {
+ ServerInstance serverInstance = entry.getKey();
+ Integer numSegmentsForServer = segmentCountPerServer.get(serverInstance);
if (numSegmentsForServer == null) {
numSegmentsForServer = 0;
}
numSegmentsForServer += entry.getValue().size();
- segmentCountPerServer.put(serverName, numSegmentsForServer);
+ segmentCountPerServer.put(serverInstance, numSegmentsForServer);
}
}
@@ -253,9 +255,9 @@ public class LargeClusterRoutingTableBuilderTest {
ExternalView externalView, List<InstanceConfig> instanceConfigs, String tableName) {
_largeClusterRoutingTableBuilder.computeOnExternalViewChange(tableName, externalView, instanceConfigs);
- List<Map<String, List<String>>> routingTables = _largeClusterRoutingTableBuilder.getRoutingTables();
+ List<Map<ServerInstance, List<String>>> routingTables = _largeClusterRoutingTableBuilder.getRoutingTables();
- for (Map<String, List<String>> routingTable : routingTables) {
+ for (Map<ServerInstance, List<String>> routingTable : routingTables) {
assertTrue(routingTableValidator.isRoutingTableValid(routingTable, externalView, instanceConfigs), message);
}
}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilderTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilderTest.java
index fc1c713..3ee0820 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilderTest.java
@@ -30,6 +30,7 @@ import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.response.ServerInstance;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
import org.apache.pinot.common.utils.LLCSegmentName;
@@ -135,13 +136,13 @@ public class LowLevelConsumerRoutingTableBuilderTest {
long startTime = System.nanoTime();
routingTableBuilder.computeOnExternalViewChange("table_REALTIME", externalView, instanceConfigs);
- List<Map<String, List<String>>> routingTables = routingTableBuilder.getRoutingTables();
+ List<Map<ServerInstance, List<String>>> routingTables = routingTableBuilder.getRoutingTables();
long endTime = System.nanoTime();
totalNanos += endTime - startTime;
// Check that all routing tables generated match all segments, with no duplicates
- for (Map<String, List<String>> routingTable : routingTables) {
+ for (Map<ServerInstance, List<String>> routingTable : routingTables) {
Set<String> assignedSegments = new HashSet<>();
for (List<String> segmentsForServer : routingTable.values()) {
@@ -198,8 +199,8 @@ public class LowLevelConsumerRoutingTableBuilderTest {
externalView.setState(consumingSegment2, instance2, RealtimeSegmentOnlineOfflineStateModel.CONSUMING);
routingTableBuilder.computeOnExternalViewChange(realtimeTableName, externalView, instanceConfigs);
- List<Map<String, List<String>>> routingTables = routingTableBuilder.getRoutingTables();
- for (Map<String, List<String>> routingTable : routingTables) {
+ List<Map<ServerInstance, List<String>>> routingTables = routingTableBuilder.getRoutingTables();
+ for (Map<ServerInstance, List<String>> routingTable : routingTables) {
ArrayList<String> segmentsInRoutingTable = new ArrayList<>();
for (List<String> segmentsForServer : routingTable.values()) {
segmentsInRoutingTable.addAll(segmentsForServer);
@@ -248,22 +249,22 @@ public class LowLevelConsumerRoutingTableBuilderTest {
}
routingTableBuilder.computeOnExternalViewChange(rawTableName, externalView, instanceConfigs);
- List<Map<String, List<String>>> routingTables = routingTableBuilder.getRoutingTables();
- for (Map<String, List<String>> routingTable : routingTables) {
+ List<Map<ServerInstance, List<String>>> routingTables = routingTableBuilder.getRoutingTables();
+ for (Map<ServerInstance, List<String>> routingTable : routingTables) {
Assert.assertTrue(routingTable.isEmpty());
}
instanceConfig.getRecord().setSimpleField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS, "false");
routingTableBuilder.computeOnExternalViewChange(rawTableName, externalView, instanceConfigs);
routingTables = routingTableBuilder.getRoutingTables();
- for (Map<String, List<String>> routingTable : routingTables) {
+ for (Map<ServerInstance, List<String>> routingTable : routingTables) {
Assert.assertFalse(routingTable.isEmpty());
}
instanceConfig.getRecord().setSimpleField(CommonConstants.Helix.QUERIES_DISABLED, "true");
routingTableBuilder.computeOnExternalViewChange(rawTableName, externalView, instanceConfigs);
routingTables = routingTableBuilder.getRoutingTables();
- for (Map<String, List<String>> routingTable : routingTables) {
+ for (Map<ServerInstance, List<String>> routingTable : routingTables) {
Assert.assertTrue(routingTable.isEmpty());
}
}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java
index 52b2d82..f3f0fa3 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java
@@ -41,6 +41,7 @@ import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.response.ServerInstance;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.pql.parsers.Pql2Compiler;
import org.testng.Assert;
@@ -116,7 +117,7 @@ public class PartitionAwareOfflineRoutingTableBuilderTest {
// Check the query that requires to scan all segment.
String countStarQuery = "select count(*) from myTable";
- Map<String, List<String>> routingTable =
+ Map<ServerInstance, List<String>> routingTable =
routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
// Check that the number of servers picked are always equal or less than the number of servers
@@ -217,10 +218,10 @@ public class PartitionAwareOfflineRoutingTableBuilderTest {
// Compute routing table and this should not throw null pointer exception
routingTableBuilder.computeOnExternalViewChange(OFFLINE_TABLE_NAME, newExternalView, instanceConfigs);
- Set<String> servers = new HashSet<>();
+ Set<ServerInstance> servers = new HashSet<>();
for (int i = 0; i < 100; i++) {
String countStarQuery = "select count(*) from " + OFFLINE_TABLE_NAME;
- Map<String, List<String>> routingTable =
+ Map<ServerInstance, List<String>> routingTable =
routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
Assert.assertEquals(routingTable.keySet().size(), 1);
servers.add(routingTable.keySet().iterator().next());
@@ -278,10 +279,10 @@ public class PartitionAwareOfflineRoutingTableBuilderTest {
RoutingTableBuilder routingTableBuilder =
buildPartitionAwareOfflineRoutingTableBuilder(fakePropertyStore, tableConfig, externalView, instanceConfigs);
- Set<String> servers = new HashSet<>();
+ Set<ServerInstance> servers = new HashSet<>();
for (int i = 0; i < 100; i++) {
String countStarQuery = "select count(*) from " + OFFLINE_TABLE_NAME;
- Map<String, List<String>> routingTable =
+ Map<ServerInstance, List<String>> routingTable =
routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
Assert.assertEquals(routingTable.keySet().size(), 1);
servers.add(routingTable.keySet().iterator().next());
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java
index 6274b14..a4ccff4 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java
@@ -39,6 +39,7 @@ import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.response.ServerInstance;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.pql.parsers.Pql2Compiler;
@@ -104,7 +105,7 @@ public class PartitionAwareRealtimeRoutingTableBuilderTest {
// Check the query that requires to scan all segment.
String countStarQuery = "select count(*) from myTable";
- Map<String, List<String>> routingTable =
+ Map<ServerInstance, List<String>> routingTable =
routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
// Check that all segments are covered exactly for once.
@@ -181,7 +182,7 @@ public class PartitionAwareRealtimeRoutingTableBuilderTest {
// Check the query that requires to scan all segment.
String countStarQuery = "select count(*) from myTable";
- Map<String, List<String>> routingTable =
+ Map<ServerInstance, List<String>> routingTable =
routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
// Check that all segments are covered exactly for once.
@@ -254,10 +255,10 @@ public class PartitionAwareRealtimeRoutingTableBuilderTest {
// Compute routing table
routingTableBuilder.computeOnExternalViewChange(REALTIME_TABLE_NAME, newExternalView, instanceConfigs);
- Set<String> servers = new HashSet<>();
+ Set<ServerInstance> servers = new HashSet<>();
for (int i = 0; i < 100; i++) {
String countStarQuery = "select count(*) from " + REALTIME_TABLE_NAME;
- Map<String, List<String>> routingTable =
+ Map<ServerInstance, List<String>> routingTable =
routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
Assert.assertEquals(routingTable.keySet().size(), 1);
servers.addAll(routingTable.keySet());
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/ServerInstance.java b/pinot-common/src/main/java/org/apache/pinot/common/response/ServerInstance.java
index adbe111..e6bda2f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/ServerInstance.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/ServerInstance.java
@@ -144,6 +144,11 @@ public class ServerInstance {
return _port;
}
+ public String getInstanceName() {
+ return toString();
+ }
+
+
public ServerInstance withSeq(int seq) {
return new ServerInstance(_hostName, _shortHostName, _port, seq);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
index c2d7bb3..d0fd717 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
@@ -28,6 +28,7 @@ import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.InstanceRequest;
+import org.apache.pinot.common.response.ServerInstance;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import org.apache.pinot.common.utils.DataTable;
import org.slf4j.Logger;
@@ -55,8 +56,8 @@ public class QueryRouter {
}
public AsyncQueryResponse submitQuery(long requestId, String rawTableName,
- @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<String, List<String>> offlineRoutingTable,
- @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<String, List<String>> realtimeRoutingTable,
+ @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<ServerInstance, List<String>> offlineRoutingTable,
+ @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable,
long timeoutMs) {
assert offlineBrokerRequest != null || realtimeBrokerRequest != null;
@@ -64,16 +65,18 @@ public class QueryRouter {
Map<Server, InstanceRequest> requestMap = new HashMap<>();
if (offlineBrokerRequest != null) {
assert offlineRoutingTable != null;
- for (Map.Entry<String, List<String>> entry : offlineRoutingTable.entrySet()) {
- Server server = new Server(entry.getKey(), TableType.OFFLINE);
+ for (Map.Entry<ServerInstance, List<String>> entry : offlineRoutingTable.entrySet()) {
+ ServerInstance serverInstance = entry.getKey();
+ Server server = new Server(serverInstance.getHostname(), serverInstance.getPort(), TableType.OFFLINE);
InstanceRequest instanceRequest = getInstanceRequest(requestId, offlineBrokerRequest, entry.getValue());
requestMap.put(server, instanceRequest);
}
}
if (realtimeBrokerRequest != null) {
assert realtimeRoutingTable != null;
- for (Map.Entry<String, List<String>> entry : realtimeRoutingTable.entrySet()) {
- Server server = new Server(entry.getKey(), TableType.REALTIME);
+ for (Map.Entry<ServerInstance, List<String>> entry : realtimeRoutingTable.entrySet()) {
+ ServerInstance serverInstance = entry.getKey();
+ Server server = new Server(serverInstance.getHostname(), serverInstance.getPort(), TableType.REALTIME);
InstanceRequest instanceRequest = getInstanceRequest(requestId, realtimeBrokerRequest, entry.getValue());
requestMap.put(server, instanceRequest);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/Server.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/Server.java
index d8849f9..033eb9c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/Server.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/Server.java
@@ -53,6 +53,12 @@ public class Server {
_tableType = tableType;
}
+ public Server(String hostName, int port, TableType tableType) {
+ _hostName = hostName;
+ _port = port;
+ _tableType = tableType;
+ }
+
public String getHostName() {
return _hostName;
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRouterTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRouterTest.java
index 7ab391a..646431d 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRouterTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRouterTest.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.response.ServerInstance;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.datatable.DataTableImplV2;
@@ -39,8 +40,8 @@ public class QueryRouterTest {
private static final Server OFFLINE_SERVER = new Server(SERVER_INSTANCE_NAME, TableType.OFFLINE);
private static final Server REALTIME_SERVER = new Server(SERVER_INSTANCE_NAME, TableType.REALTIME);
private static final BrokerRequest BROKER_REQUEST = new BrokerRequest();
- private static final Map<String, List<String>> ROUTING_TABLE =
- Collections.singletonMap(SERVER_INSTANCE_NAME, Collections.emptyList());
+ private static final Map<ServerInstance, List<String>> ROUTING_TABLE =
+ Collections.singletonMap(ServerInstance.forInstanceName(SERVER_INSTANCE_NAME), Collections.emptyList());
private QueryRouter _queryRouter;
diff --git a/pinot-transport/src/main/java/org/apache/pinot/transport/config/PerTableRoutingConfig.java b/pinot-transport/src/main/java/org/apache/pinot/transport/config/PerTableRoutingConfig.java
index 2f65230..80f9536 100644
--- a/pinot-transport/src/main/java/org/apache/pinot/transport/config/PerTableRoutingConfig.java
+++ b/pinot-transport/src/main/java/org/apache/pinot/transport/config/PerTableRoutingConfig.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.common.response.ServerInstance;
/**
@@ -125,10 +126,10 @@ public class PerTableRoutingConfig {
*
* @return
*/
- public Map<String, List<String>> buildRequestRoutingMap() {
- Map<String, List<String>> resultMap = new HashMap<>();
+ public Map<ServerInstance, List<String>> buildRequestRoutingMap() {
+ Map<ServerInstance, List<String>> resultMap = new HashMap<>();
for (String serverName : _defaultServers) {
- resultMap.put(serverName, Collections.singletonList("default"));
+ resultMap.put(ServerInstance.forInstanceName(serverName), Collections.singletonList("default"));
}
return resultMap;
}
diff --git a/pinot-transport/src/main/java/org/apache/pinot/transport/scattergather/ScatterGatherImpl.java b/pinot-transport/src/main/java/org/apache/pinot/transport/scattergather/ScatterGatherImpl.java
index 64d852d..9857f78 100644
--- a/pinot-transport/src/main/java/org/apache/pinot/transport/scattergather/ScatterGatherImpl.java
+++ b/pinot-transport/src/main/java/org/apache/pinot/transport/scattergather/ScatterGatherImpl.java
@@ -96,14 +96,14 @@ public class ScatterGatherImpl implements ScatterGather {
ScatterGatherStats scatterGatherStats, Boolean isOfflineTable, BrokerMetrics brokerMetrics)
throws InterruptedException {
ScatterGatherRequest scatterGatherRequest = scatterGatherRequestContext._request;
- Map<String, List<String>> routingTable = scatterGatherRequest.getRoutingTable();
+ Map<ServerInstance, List<String>> routingTable = scatterGatherRequest.getRoutingTable();
CountDownLatch requestDispatchLatch = new CountDownLatch(routingTable.size());
// async checkout of connections and then dispatch of request
List<SingleRequestHandler> handlers = new ArrayList<>(routingTable.size());
- for (Entry<String, List<String>> entry : routingTable.entrySet()) {
- ServerInstance serverInstance = ServerInstance.forInstanceName(entry.getKey());
+ for (Entry<ServerInstance, List<String>> entry : routingTable.entrySet()) {
+ ServerInstance serverInstance = entry.getKey();
String shortServerName = serverInstance.getShortHostName();
if (isOfflineTable != null) {
if (isOfflineTable) {
diff --git a/pinot-transport/src/main/java/org/apache/pinot/transport/scattergather/ScatterGatherRequest.java b/pinot-transport/src/main/java/org/apache/pinot/transport/scattergather/ScatterGatherRequest.java
index 8ea9ee6..d3c631d 100644
--- a/pinot-transport/src/main/java/org/apache/pinot/transport/scattergather/ScatterGatherRequest.java
+++ b/pinot-transport/src/main/java/org/apache/pinot/transport/scattergather/ScatterGatherRequest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.transport.scattergather;
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.response.ServerInstance;
/**
@@ -35,7 +36,7 @@ public interface ScatterGatherRequest {
*
* @return Map from server to list of segments
*/
- Map<String, List<String>> getRoutingTable();
+ Map<ServerInstance, List<String>> getRoutingTable();
/**
* Get the request to be sent to the server.
diff --git a/pinot-transport/src/test/java/org/apache/pinot/transport/perf/ScatterGatherPerfClient.java b/pinot-transport/src/test/java/org/apache/pinot/transport/perf/ScatterGatherPerfClient.java
index bf4422c..ab85a48 100644
--- a/pinot-transport/src/test/java/org/apache/pinot/transport/perf/ScatterGatherPerfClient.java
+++ b/pinot-transport/src/test/java/org/apache/pinot/transport/perf/ScatterGatherPerfClient.java
@@ -388,7 +388,7 @@ public class ScatterGatherPerfClient implements Runnable {
public static class SimpleScatterGatherRequest implements ScatterGatherRequest {
private final byte[] _brokerRequest;
private final long _requestId;
- private final Map<String, List<String>> _pgToServersMap;
+ private final Map<ServerInstance, List<String>> _pgToServersMap;
public SimpleScatterGatherRequest(byte[] q, PerTableRoutingConfig routingConfig, long requestId) {
_brokerRequest = q;
@@ -397,7 +397,7 @@ public class ScatterGatherPerfClient implements Runnable {
}
@Override
- public Map<String, List<String>> getRoutingTable() {
+ public Map<ServerInstance, List<String>> getRoutingTable() {
return _pgToServersMap;
}
diff --git a/pinot-transport/src/test/java/org/apache/pinot/transport/scattergather/ScatterGatherTest.java b/pinot-transport/src/test/java/org/apache/pinot/transport/scattergather/ScatterGatherTest.java
index 6871169..ddd1a8e 100644
--- a/pinot-transport/src/test/java/org/apache/pinot/transport/scattergather/ScatterGatherTest.java
+++ b/pinot-transport/src/test/java/org/apache/pinot/transport/scattergather/ScatterGatherTest.java
@@ -66,7 +66,7 @@ public class ScatterGatherTest {
NettyServer[] nettyServers = new NettyServer[NUM_SERVERS];
String[] serverNames = new String[NUM_SERVERS];
ServerInstance[] serverInstances = new ServerInstance[NUM_SERVERS];
- Map<String, List<String>> routingTable = new HashMap<>(NUM_SERVERS);
+ Map<ServerInstance, List<String>> routingTable = new HashMap<>(NUM_SERVERS);
for (int i = 0; i < NUM_SERVERS; i++) {
int serverPort = BASE_SERVER_PORT + i;
@@ -77,7 +77,7 @@ public class ScatterGatherTest {
+ ServerInstance.NAME_PORT_DELIMITER_FOR_INSTANCE_NAME + serverPort;
serverNames[i] = serverName;
serverInstances[i] = ServerInstance.forInstanceName(serverName);
- routingTable.put(serverName, Collections.singletonList("segment_" + i));
+ routingTable.put(serverInstances[i], Collections.singletonList("segment_" + i));
}
// Setup client
@@ -122,7 +122,7 @@ public class ScatterGatherTest {
NettyServer[] nettyServers = new NettyServer[NUM_SERVERS];
String[] serverNames = new String[NUM_SERVERS];
ServerInstance[] serverInstances = new ServerInstance[NUM_SERVERS];
- Map<String, List<String>> routingTable = new HashMap<>(NUM_SERVERS);
+ Map<ServerInstance, List<String>> routingTable = new HashMap<>(NUM_SERVERS);
for (int i = 0; i < NUM_SERVERS; i++) {
int serverPort = BASE_SERVER_PORT + i;
@@ -139,7 +139,7 @@ public class ScatterGatherTest {
+ ServerInstance.NAME_PORT_DELIMITER_FOR_INSTANCE_NAME + serverPort;
serverNames[i] = serverName;
serverInstances[i] = ServerInstance.forInstanceName(serverName);
- routingTable.put(serverName, Collections.singletonList("segment_" + i));
+ routingTable.put(serverInstances[i], Collections.singletonList("segment_" + i));
}
// Setup client
@@ -185,7 +185,7 @@ public class ScatterGatherTest {
NettyServer[] nettyServers = new NettyServer[NUM_SERVERS];
String[] serverNames = new String[NUM_SERVERS];
ServerInstance[] serverInstances = new ServerInstance[NUM_SERVERS];
- Map<String, List<String>> routingTable = new HashMap<>(NUM_SERVERS);
+ Map<ServerInstance, List<String>> routingTable = new HashMap<>(NUM_SERVERS);
for (int i = 0; i < NUM_SERVERS; i++) {
int serverPort = BASE_SERVER_PORT + i;
@@ -203,7 +203,7 @@ public class ScatterGatherTest {
+ ServerInstance.NAME_PORT_DELIMITER_FOR_INSTANCE_NAME + serverPort;
serverNames[i] = serverName;
serverInstances[i] = ServerInstance.forInstanceName(serverName);
- routingTable.put(serverName, Collections.singletonList("segment_" + i));
+ routingTable.put(serverInstances[i], Collections.singletonList("segment_" + i));
}
// Setup client
@@ -257,16 +257,16 @@ public class ScatterGatherTest {
}
private static class TestScatterGatherRequest implements ScatterGatherRequest {
- private final Map<String, List<String>> _routingTable;
+ private final Map<ServerInstance, List<String>> _routingTable;
private final long _timeoutMs;
- public TestScatterGatherRequest(Map<String, List<String>> routingTable, long timeoutMs) {
+ public TestScatterGatherRequest(Map<ServerInstance, List<String>> routingTable, long timeoutMs) {
_routingTable = routingTable;
_timeoutMs = timeoutMs;
}
@Override
- public Map<String, List<String>> getRoutingTable() {
+ public Map<ServerInstance, List<String>> getRoutingTable() {
return _routingTable;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org