You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xb...@apache.org on 2023/11/29 01:00:33 UTC
(pinot) branch master updated: Allow optional segments that can be skipped by servers without failing the query (#11978)
This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3bd74b36e4 Allow optional segments that can be skipped by servers without failing the query (#11978)
3bd74b36e4 is described below
commit 3bd74b36e4fad0ba35579854339a07d86f5bebd5
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Tue Nov 28 17:00:27 2023 -0800
Allow optional segments that can be skipped by servers without failing the query (#11978)
* collect optional segments and pass to servers to handle instead of simply skipped at broker side, which could lead to wrong query results particularly for upsert tables
---
.../broker/api/resources/PinotBrokerDebug.java | 90 ++++++++--
.../requesthandler/BaseBrokerRequestHandler.java | 26 +--
.../requesthandler/GrpcBrokerRequestHandler.java | 19 ++-
.../MultiStageBrokerRequestHandler.java | 9 +-
.../SingleConnectionBrokerRequestHandler.java | 8 +-
.../pinot/broker/routing/BrokerRoutingManager.java | 35 +++-
.../instanceselector/BalancedInstanceSelector.java | 17 +-
.../instanceselector/BaseInstanceSelector.java | 22 ++-
.../routing/instanceselector/InstanceSelector.java | 21 ++-
.../MultiStageReplicaGroupSelector.java | 20 ++-
.../ReplicaGroupInstanceSelector.java | 40 +++--
.../StrictReplicaGroupInstanceSelector.java | 4 +
.../broker/broker/HelixBrokerStarterTest.java | 9 +-
.../BaseBrokerRequestHandlerTest.java | 9 +-
.../instanceselector/InstanceSelectorTest.java | 53 +++---
.../pinot/common/request/InstanceRequest.java | 186 +++++++++++++++++++--
pinot-common/src/thrift/request.thrift | 1 +
.../common/reader/PinotServerDataFetcher.scala | 11 +-
.../core/data/manager/BaseTableDataManager.java | 15 ++
.../query/executor/ServerQueryExecutorV1Impl.java | 8 +-
.../pinot/core/query/logger/ServerQueryLogger.java | 2 +-
.../core/query/request/ServerQueryRequest.java | 8 +
.../apache/pinot/core/routing/RoutingTable.java | 13 +-
.../apache/pinot/core/transport/QueryRouter.java | 24 ++-
.../pinot/core/transport/QueryRoutingTest.java | 5 +-
.../apache/pinot/query/routing/WorkerManager.java | 12 +-
.../query/testutils/MockRoutingManagerFactory.java | 12 +-
.../testutils/MockInstanceDataManagerFactory.java | 4 +-
.../local/data/manager/TableDataManager.java | 5 +
29 files changed, 522 insertions(+), 166 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
index 30e995298c..f9799d7e75 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
@@ -27,10 +27,12 @@ import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@@ -42,6 +44,7 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.request.BrokerRequest;
@@ -114,13 +117,45 @@ public class PinotBrokerDebug {
public Map<String, Map<ServerInstance, List<String>>> getRoutingTable(
@ApiParam(value = "Name of the table") @PathParam("tableName") String tableName) {
Map<String, Map<ServerInstance, List<String>>> result = new TreeMap<>();
+ getRoutingTable(tableName, (tableNameWithType, routingTable) -> result.put(tableNameWithType,
+ removeOptionalSegments(routingTable.getServerInstanceToSegmentsMap())));
+ if (!result.isEmpty()) {
+ return result;
+ } else {
+ throw new WebApplicationException("Cannot find routing for table: " + tableName, Response.Status.NOT_FOUND);
+ }
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/debug/routingTableWithOptionalSegments/{tableName}")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.GET_ROUTING_TABLE)
+ @ApiOperation(value = "Get the routing table for a table, including optional segments")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Routing table"),
+ @ApiResponse(code = 404, message = "Routing not found"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ public Map<String, Map<ServerInstance, Pair<List<String>, List<String>>>> getRoutingTableWithOptionalSegments(
+ @ApiParam(value = "Name of the table") @PathParam("tableName") String tableName) {
+ Map<String, Map<ServerInstance, Pair<List<String>, List<String>>>> result = new TreeMap<>();
+ getRoutingTable(tableName, (tableNameWithType, routingTable) -> result.put(tableNameWithType,
+ routingTable.getServerInstanceToSegmentsMap()));
+ if (!result.isEmpty()) {
+ return result;
+ } else {
+ throw new WebApplicationException("Cannot find routing for table: " + tableName, Response.Status.NOT_FOUND);
+ }
+ }
+
+ private void getRoutingTable(String tableName, BiConsumer<String, RoutingTable> consumer) {
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
if (tableType != TableType.REALTIME) {
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
RoutingTable routingTable = _routingManager.getRoutingTable(
CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + offlineTableName), getRequestId());
if (routingTable != null) {
- result.put(offlineTableName, routingTable.getServerInstanceToSegmentsMap());
+ consumer.accept(offlineTableName, routingTable);
}
}
if (tableType != TableType.OFFLINE) {
@@ -128,14 +163,16 @@ public class PinotBrokerDebug {
RoutingTable routingTable = _routingManager.getRoutingTable(
CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + realtimeTableName), getRequestId());
if (routingTable != null) {
- result.put(realtimeTableName, routingTable.getServerInstanceToSegmentsMap());
+ consumer.accept(realtimeTableName, routingTable);
}
}
- if (!result.isEmpty()) {
- return result;
- } else {
- throw new WebApplicationException("Cannot find routing for table: " + tableName, Response.Status.NOT_FOUND);
- }
+ }
+
+ private static Map<ServerInstance, List<String>> removeOptionalSegments(
+ Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap) {
+ Map<ServerInstance, List<String>> ret = new HashMap<>();
+ serverInstanceToSegmentsMap.forEach((k, v) -> ret.put(k, v.getLeft()));
+ return ret;
}
@GET
@@ -152,7 +189,39 @@ public class PinotBrokerDebug {
@ApiParam(value = "SQL query (table name should have type suffix)") @QueryParam("query") String query,
@Context HttpHeaders httpHeaders) {
BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query);
+ checkAccessControl(brokerRequest, httpHeaders);
+ RoutingTable routingTable = _routingManager.getRoutingTable(brokerRequest, getRequestId());
+ if (routingTable != null) {
+ return removeOptionalSegments(routingTable.getServerInstanceToSegmentsMap());
+ } else {
+ throw new WebApplicationException("Cannot find routing for query: " + query, Response.Status.NOT_FOUND);
+ }
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/debug/routingTableWithOptionalSegments/sql")
+ @ManualAuthorization
+ @ApiOperation(value = "Get the routing table for a query, including optional segments")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Routing table"),
+ @ApiResponse(code = 404, message = "Routing not found"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ public Map<ServerInstance, Pair<List<String>, List<String>>> getRoutingTableForQueryWithOptionalSegments(
+ @ApiParam(value = "SQL query (table name should have type suffix)") @QueryParam("query") String query,
+ @Context HttpHeaders httpHeaders) {
+ BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query);
+ checkAccessControl(brokerRequest, httpHeaders);
+ RoutingTable routingTable = _routingManager.getRoutingTable(brokerRequest, getRequestId());
+ if (routingTable != null) {
+ return routingTable.getServerInstanceToSegmentsMap();
+ } else {
+ throw new WebApplicationException("Cannot find routing for query: " + query, Response.Status.NOT_FOUND);
+ }
+ }
+ private void checkAccessControl(BrokerRequest brokerRequest, HttpHeaders httpHeaders) {
// TODO: Handle nested queries
if (brokerRequest.isSetQuerySource() && brokerRequest.getQuerySource().isSetTableName()) {
if (!_accessControlFactory.create()
@@ -163,13 +232,6 @@ public class PinotBrokerDebug {
} else {
throw new WebApplicationException("Table name is not set in the query", Response.Status.BAD_REQUEST);
}
-
- RoutingTable routingTable = _routingManager.getRoutingTable(brokerRequest, getRequestId());
- if (routingTable != null) {
- return routingTable.getServerInstanceToSegmentsMap();
- } else {
- throw new WebApplicationException("Cannot find routing for query: " + query, Response.Status.NOT_FOUND);
- }
}
/**
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index bcc5049ff0..c93248fbe4 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -43,6 +43,7 @@ import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
@@ -592,8 +593,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
// Calculate routing table for the query
// TODO: Modify RoutingManager interface to directly take PinotQuery
long routingStartTimeNs = System.nanoTime();
- Map<ServerInstance, List<String>> offlineRoutingTable = null;
- Map<ServerInstance, List<String>> realtimeRoutingTable = null;
+ Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable = null;
+ Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable = null;
List<String> unavailableSegments = new ArrayList<>();
int numPrunedSegmentsTotal = 0;
if (offlineBrokerRequest != null) {
@@ -601,7 +602,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
RoutingTable routingTable = _routingManager.getRoutingTable(offlineBrokerRequest, requestId);
if (routingTable != null) {
unavailableSegments.addAll(routingTable.getUnavailableSegments());
- Map<ServerInstance, List<String>> serverInstanceToSegmentsMap = routingTable.getServerInstanceToSegmentsMap();
+ Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap =
+ routingTable.getServerInstanceToSegmentsMap();
if (!serverInstanceToSegmentsMap.isEmpty()) {
offlineRoutingTable = serverInstanceToSegmentsMap;
} else {
@@ -617,7 +619,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
RoutingTable routingTable = _routingManager.getRoutingTable(realtimeBrokerRequest, requestId);
if (routingTable != null) {
unavailableSegments.addAll(routingTable.getUnavailableSegments());
- Map<ServerInstance, List<String>> serverInstanceToSegmentsMap = routingTable.getServerInstanceToSegmentsMap();
+ Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap =
+ routingTable.getServerInstanceToSegmentsMap();
if (!serverInstanceToSegmentsMap.isEmpty()) {
realtimeRoutingTable = serverInstanceToSegmentsMap;
} else {
@@ -1815,9 +1818,10 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
*/
protected abstract BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest,
- @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest,
- @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats,
- RequestContext requestContext)
+ @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable,
+ @Nullable BrokerRequest realtimeBrokerRequest,
+ @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs,
+ ServerStats serverStats, RequestContext requestContext)
throws Exception;
protected static boolean isPartialResult(BrokerResponse brokerResponse) {
@@ -1858,8 +1862,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
statistics.setNumSegmentsPrunedByValue(response.getNumSegmentsPrunedByValue());
statistics.setExplainPlanNumEmptyFilterSegments(response.getExplainPlanNumEmptyFilterSegments());
statistics.setExplainPlanNumMatchAllFilterSegments(response.getExplainPlanNumMatchAllFilterSegments());
- statistics.setProcessingExceptions(response.getProcessingExceptions().stream().map(Object::toString).collect(
- Collectors.toList()));
+ statistics.setProcessingExceptions(
+ response.getProcessingExceptions().stream().map(Object::toString).collect(Collectors.toList()));
}
private String getGlobalQueryId(long requestId) {
@@ -1888,8 +1892,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
final String _query;
final Set<ServerInstance> _servers = new HashSet<>();
- QueryServers(String query, @Nullable Map<ServerInstance, List<String>> offlineRoutingTable,
- @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable) {
+ QueryServers(String query, @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable,
+ @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable) {
_query = query;
if (offlineRoutingTable != null) {
_servers.addAll(offlineRoutingTable.keySet());
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
index b2e36d16ef..c2cd3d6027 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
@@ -88,9 +89,10 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler {
@Override
protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest,
- @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest,
- @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats,
- RequestContext requestContext)
+ @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable,
+ @Nullable BrokerRequest realtimeBrokerRequest,
+ @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs,
+ ServerStats serverStats, RequestContext requestContext)
throws Exception {
// TODO: Support failure detection
assert offlineBrokerRequest != null || realtimeBrokerRequest != null;
@@ -106,8 +108,8 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler {
requestContext.isSampledRequest());
}
final long startReduceTimeNanos = System.nanoTime();
- BrokerResponseNative brokerResponse = _streamingReduceService.reduceOnStreamResponse(originalBrokerRequest,
- responseMap, timeoutMs, _brokerMetrics);
+ BrokerResponseNative brokerResponse =
+ _streamingReduceService.reduceOnStreamResponse(originalBrokerRequest, responseMap, timeoutMs, _brokerMetrics);
requestContext.setReduceTimeNanos(System.nanoTime() - startReduceTimeNanos);
return brokerResponse;
}
@@ -116,11 +118,12 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler {
* Query pinot server for data table.
*/
private void sendRequest(long requestId, TableType tableType, BrokerRequest brokerRequest,
- Map<ServerInstance, List<String>> routingTable,
+ Map<ServerInstance, Pair<List<String>, List<String>>> routingTable,
Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap, boolean trace) {
- for (Map.Entry<ServerInstance, List<String>> routingEntry : routingTable.entrySet()) {
+ for (Map.Entry<ServerInstance, Pair<List<String>, List<String>>> routingEntry : routingTable.entrySet()) {
ServerInstance serverInstance = routingEntry.getKey();
- List<String> segments = routingEntry.getValue();
+ // TODO: support optional segments for GrpcQueryServer.
+ List<String> segments = routingEntry.getValue().getLeft();
String serverHost = serverInstance.getHostname();
int port = serverInstance.getGrpcPort();
// TODO: enable throttling on per host bases.
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index d329a99f69..9e74419830 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -31,6 +31,7 @@ import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.calcite.jdbc.CalciteSchemaBuilder;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.broker.api.AccessControl;
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.broker.broker.AccessControlFactory;
@@ -314,9 +315,11 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
@Override
protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest,
- @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest,
- @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats,
- RequestContext requestContext) {
+ @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable,
+ @Nullable BrokerRequest realtimeBrokerRequest,
+ @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs,
+ ServerStats serverStats, RequestContext requestContext)
+ throws Exception {
throw new UnsupportedOperationException();
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index d44532a500..68ae70f9eb 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.failuredetector.FailureDetector;
import org.apache.pinot.broker.failuredetector.FailureDetectorFactory;
@@ -101,9 +102,10 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl
@Override
protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest,
- @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest,
- @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats,
- RequestContext requestContext)
+ @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable,
+ @Nullable BrokerRequest realtimeBrokerRequest,
+ @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs,
+ ServerStats serverStats, RequestContext requestContext)
throws Exception {
assert offlineBrokerRequest != null || realtimeBrokerRequest != null;
if (requestContext.isSampledRequest()) {
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index 3cd14e676f..cc3a5354ef 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixConstants.ChangeType;
@@ -610,19 +611,38 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle
return null;
}
InstanceSelector.SelectionResult selectionResult = routingEntry.calculateRouting(brokerRequest, requestId);
- Map<String, String> segmentToInstanceMap = selectionResult.getSegmentToInstanceMap();
- Map<ServerInstance, List<String>> serverInstanceToSegmentsMap = new HashMap<>();
- for (Map.Entry<String, String> entry : segmentToInstanceMap.entrySet()) {
+ return new RoutingTable(getServerInstanceToSegmentsMap(tableNameWithType, selectionResult),
+ selectionResult.getUnavailableSegments(), selectionResult.getNumPrunedSegments());
+ }
+
+ private Map<ServerInstance, Pair<List<String>, List<String>>> getServerInstanceToSegmentsMap(String tableNameWithType,
+ InstanceSelector.SelectionResult selectionResult) {
+ Map<ServerInstance, Pair<List<String>, List<String>>> merged = new HashMap<>();
+ for (Map.Entry<String, String> entry : selectionResult.getSegmentToInstanceMap().entrySet()) {
ServerInstance serverInstance = _enabledServerInstanceMap.get(entry.getValue());
if (serverInstance != null) {
- serverInstanceToSegmentsMap.computeIfAbsent(serverInstance, k -> new ArrayList<>()).add(entry.getKey());
+ Pair<List<String>, List<String>> pair =
+ merged.computeIfAbsent(serverInstance, k -> Pair.of(new ArrayList<>(), new ArrayList<>()));
+ pair.getLeft().add(entry.getKey());
} else {
// Should not happen in normal case unless encountered unexpected exception when updating routing entries
_brokerMetrics.addMeteredTableValue(tableNameWithType, BrokerMeter.SERVER_MISSING_FOR_ROUTING, 1L);
}
}
- return new RoutingTable(serverInstanceToSegmentsMap, selectionResult.getUnavailableSegments(),
- selectionResult.getNumPrunedSegments());
+ for (Map.Entry<String, String> entry : selectionResult.getOptionalSegmentToInstanceMap().entrySet()) {
+ ServerInstance serverInstance = _enabledServerInstanceMap.get(entry.getValue());
+ if (serverInstance != null) {
+ Pair<List<String>, List<String>> pair = merged.get(serverInstance);
+ // Skip servers that don't have non-optional segments, so that servers always get some non-optional segments
+ // to process, to be backward compatible.
+ // TODO: allow servers only with optional segments
+ if (pair != null) {
+ pair.getRight().add(entry.getKey());
+ }
+ }
+ // TODO: Report missing server metrics when we allow servers only with optional segments.
+ }
+ return merged;
}
@Override
@@ -795,7 +815,8 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle
selectionResult.setNumPrunedSegments(numPrunedSegments);
return selectionResult;
} else {
- return new InstanceSelector.SelectionResult(Collections.emptyMap(), Collections.emptyList(), numPrunedSegments);
+ return new InstanceSelector.SelectionResult(Pair.of(Collections.emptyMap(), Collections.emptyMap()),
+ Collections.emptyList(), numPrunedSegments);
}
}
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
index 17c9d5e5b8..77b5389fd4 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
@@ -54,9 +55,11 @@ public class BalancedInstanceSelector extends BaseInstanceSelector {
}
@Override
- Map<String, String> select(List<String> segments, int requestId, SegmentStates segmentStates,
- Map<String, String> queryOptions) {
+ Pair<Map<String, String>, Map<String, String>> select(List<String> segments, int requestId,
+ SegmentStates segmentStates, Map<String, String> queryOptions) {
Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
+ // No need to adjust this map per total segment numbers, as optional segments should be empty most of the time.
+ Map<String, String> optionalSegmentToInstanceMap = new HashMap<>();
if (_adaptiveServerSelector != null) {
for (String segment : segments) {
List<SegmentInstanceCandidate> candidates = segmentStates.getCandidates(segment);
@@ -70,8 +73,12 @@ public class BalancedInstanceSelector extends BaseInstanceSelector {
candidateInstances.add(candidate.getInstance());
}
String selectedInstance = _adaptiveServerSelector.select(candidateInstances);
+ // This can only be offline when it is a new segment. And such segment is marked as optional segment so that
+ // broker or server can skip it upon any issue to process it.
if (candidates.get(candidateInstances.indexOf(selectedInstance)).isOnline()) {
segmentToSelectedInstanceMap.put(segment, selectedInstance);
+ } else {
+ optionalSegmentToInstanceMap.put(segment, selectedInstance);
}
}
} else {
@@ -84,11 +91,15 @@ public class BalancedInstanceSelector extends BaseInstanceSelector {
}
int selectedIdx = requestId++ % candidates.size();
SegmentInstanceCandidate selectedCandidate = candidates.get(selectedIdx);
+ // This can only be offline when it is a new segment. And such segment is marked as optional segment so that
+ // broker or server can skip it upon any issue to process it.
if (selectedCandidate.isOnline()) {
segmentToSelectedInstanceMap.put(segment, selectedCandidate.getInstance());
+ } else {
+ optionalSegmentToInstanceMap.put(segment, selectedCandidate.getInstance());
}
}
}
- return segmentToSelectedInstanceMap;
+ return Pair.of(segmentToSelectedInstanceMap, optionalSegmentToInstanceMap);
}
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index 972142663e..b2961eef94 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -30,6 +30,7 @@ import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.AccessOption;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
@@ -270,6 +271,10 @@ abstract class BaseInstanceSelector implements InstanceSelector {
}
}
}
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Got _newSegmentStateMap: {}, _oldSegmentCandidatesMap: {}", _newSegmentStateMap.keySet(),
+ _oldSegmentCandidatesMap.keySet());
+ }
}
/**
@@ -408,10 +413,11 @@ abstract class BaseInstanceSelector implements InstanceSelector {
// Copy the volatile reference so that segmentToInstanceMap and unavailableSegments can have a consistent view of
// the state.
SegmentStates segmentStates = _segmentStates;
- Map<String, String> segmentToInstanceMap = select(segments, requestIdInt, segmentStates, queryOptions);
+ Pair<Map<String, String>, Map<String, String>> segmentToInstanceMap =
+ select(segments, requestIdInt, segmentStates, queryOptions);
Set<String> unavailableSegments = segmentStates.getUnavailableSegments();
if (unavailableSegments.isEmpty()) {
- return new SelectionResult(segmentToInstanceMap, Collections.emptyList());
+ return new SelectionResult(segmentToInstanceMap, Collections.emptyList(), 0);
} else {
List<String> unavailableSegmentsForRequest = new ArrayList<>();
for (String segment : segments) {
@@ -419,7 +425,7 @@ abstract class BaseInstanceSelector implements InstanceSelector {
unavailableSegmentsForRequest.add(segment);
}
}
- return new SelectionResult(segmentToInstanceMap, unavailableSegmentsForRequest);
+ return new SelectionResult(segmentToInstanceMap, unavailableSegmentsForRequest, 0);
}
}
@@ -429,9 +435,11 @@ abstract class BaseInstanceSelector implements InstanceSelector {
}
/**
- * Selects the server instances for the given segments based on the request id and segment states. Returns a map
- * from segment to selected server instance hosting the segment.
+ * Selects the server instances for the given segments based on the request id and segment states. Returns two maps
+ * from segment to selected server instance hosting the segment. The 2nd map is for optional segments. The optional
+ * segments are used to get the new segments that is not online yet. Instead of simply skipping them by broker at
+ * routing time, we can send them to servers and let servers decide how to handle them.
*/
- abstract Map<String, String> select(List<String> segments, int requestId, SegmentStates segmentStates,
- Map<String, String> queryOptions);
+ abstract Pair<Map<String, String>, Map<String, String>/*optional segments*/> select(List<String> segments,
+ int requestId, SegmentStates segmentStates, Map<String, String> queryOptions);
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
index 9ffe830229..d003723c5b 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
@@ -75,16 +76,12 @@ public interface InstanceSelector {
Set<String> getServingInstances();
class SelectionResult {
- private final Map<String, String> _segmentToInstanceMap;
+ private final Pair<Map<String, String>, Map<String, String>/*optional segments*/> _segmentToInstanceMap;
private final List<String> _unavailableSegments;
private int _numPrunedSegments;
- public SelectionResult(Map<String, String> segmentToInstanceMap, List<String> unavailableSegments) {
- this(segmentToInstanceMap, unavailableSegments, 0);
- }
-
- public SelectionResult(Map<String, String> segmentToInstanceMap, List<String> unavailableSegments,
- int numPrunedSegments) {
+ public SelectionResult(Pair<Map<String, String>, Map<String, String>> segmentToInstanceMap,
+ List<String> unavailableSegments, int numPrunedSegments) {
_segmentToInstanceMap = segmentToInstanceMap;
_unavailableSegments = unavailableSegments;
_numPrunedSegments = numPrunedSegments;
@@ -94,7 +91,15 @@ public interface InstanceSelector {
* Returns the map from segment to selected server instance hosting the segment.
*/
public Map<String, String> getSegmentToInstanceMap() {
- return _segmentToInstanceMap;
+ return _segmentToInstanceMap.getLeft();
+ }
+
+ /**
+ * Returns the map from optional segment to selected server instance hosting the optional segment.
+ * Optional segments can be skipped by broker or server upon any issue w/o failing the query.
+ */
+ public Map<String, String> getOptionalSegmentToInstanceMap() {
+ return _segmentToInstanceMap.getRight();
}
/**
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
index 9be701ebe5..15fb525a8c 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -81,8 +82,8 @@ public class MultiStageReplicaGroupSelector extends BaseInstanceSelector {
}
@Override
- Map<String, String> select(List<String> segments, int requestId, SegmentStates segmentStates,
- Map<String, String> queryOptions) {
+ Pair<Map<String, String>, Map<String, String>> select(List<String> segments, int requestId,
+ SegmentStates segmentStates, Map<String, String> queryOptions) {
// Create a copy of InstancePartitions to avoid race-condition with event-listeners above.
InstancePartitions instancePartitions = _instancePartitions;
int replicaGroupSelected = requestId % instancePartitions.getNumReplicaGroups();
@@ -102,14 +103,15 @@ public class MultiStageReplicaGroupSelector extends BaseInstanceSelector {
* Returns a map from the segmentName to the corresponding server in the given replica-group. If the is not enabled,
* we throw an exception.
*/
- private Map<String, String> tryAssigning(List<String> segments, SegmentStates segmentStates,
- InstancePartitions instancePartitions, int replicaId) {
+ private Pair<Map<String, String>, Map<String, String>> tryAssigning(List<String> segments,
+ SegmentStates segmentStates, InstancePartitions instancePartitions, int replicaId) {
Set<String> instanceLookUpSet = new HashSet<>();
for (int partition = 0; partition < instancePartitions.getNumPartitions(); partition++) {
List<String> instances = instancePartitions.getInstances(partition, replicaId);
instanceLookUpSet.addAll(instances);
}
- Map<String, String> result = new HashMap<>();
+ Map<String, String> segmentToSelectedInstanceMap = new HashMap<>();
+ Map<String, String> optionalSegmentToInstanceMap = new HashMap<>();
for (String segment : segments) {
List<SegmentInstanceCandidate> candidates = segmentStates.getCandidates(segment);
// If candidates are null, we will throw an exception and log a warning.
@@ -119,8 +121,12 @@ public class MultiStageReplicaGroupSelector extends BaseInstanceSelector {
String instance = candidate.getInstance();
if (instanceLookUpSet.contains(instance)) {
found = true;
+ // This can only be offline when it is a new segment. And such segment is marked as optional segment so that
+ // broker or server can skip it upon any issue to process it.
if (candidate.isOnline()) {
- result.put(segment, instance);
+ segmentToSelectedInstanceMap.put(segment, instance);
+ } else {
+ optionalSegmentToInstanceMap.put(segment, instance);
}
break;
}
@@ -129,7 +135,7 @@ public class MultiStageReplicaGroupSelector extends BaseInstanceSelector {
throw new RuntimeException(String.format("Unable to find an enabled instance for segment: %s", segment));
}
}
- return result;
+ return Pair.of(segmentToSelectedInstanceMap, optionalSegmentToInstanceMap);
}
@VisibleForTesting
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
index 9aedaa8e66..3683ca46bb 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
@@ -69,8 +69,8 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector {
}
@Override
- Map<String, String> select(List<String> segments, int requestId, SegmentStates segmentStates,
- Map<String, String> queryOptions) {
+ Pair<Map<String, String>, Map<String, String>> select(List<String> segments, int requestId,
+ SegmentStates segmentStates, Map<String, String> queryOptions) {
if (_adaptiveServerSelector != null) {
// Adaptive Server Selection is enabled.
List<String> serverRankList = new ArrayList<>();
@@ -90,9 +90,11 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector {
}
}
- private Map<String, String> selectServersUsingRoundRobin(List<String> segments, int requestId,
- SegmentStates segmentStates, Map<String, String> queryOptions) {
- Map<String, String> selectedServers = new HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
+ private Pair<Map<String, String>, Map<String, String>> selectServersUsingRoundRobin(List<String> segments,
+ int requestId, SegmentStates segmentStates, Map<String, String> queryOptions) {
+ Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
+ // No need to adjust this map per total segment numbers, as optional segments should be empty most of the time.
+ Map<String, String> optionalSegmentToInstanceMap = new HashMap<>();
Integer numReplicaGroupsToQuery = QueryOptionsUtils.getNumReplicaGroupsToQuery(queryOptions);
int numReplicaGroups = numReplicaGroupsToQuery == null ? 1 : numReplicaGroupsToQuery;
int replicaOffset = 0;
@@ -107,22 +109,26 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector {
int numCandidates = candidates.size();
int instanceIdx = (requestId + replicaOffset) % numCandidates;
SegmentInstanceCandidate selectedInstance = candidates.get(instanceIdx);
- // Only put online instance.
- // This can only be offline when it is a new segment.
+ // This can only be offline when it is a new segment. And such segment is marked as optional segment so that
+ // broker or server can skip it upon any issue to process it.
if (selectedInstance.isOnline()) {
- selectedServers.put(segment, selectedInstance.getInstance());
+ segmentToSelectedInstanceMap.put(segment, selectedInstance.getInstance());
+ } else {
+ optionalSegmentToInstanceMap.put(segment, selectedInstance.getInstance());
}
if (numReplicaGroups > numCandidates) {
numReplicaGroups = numCandidates;
}
replicaOffset = (replicaOffset + 1) % numReplicaGroups;
}
- return selectedServers;
+ return Pair.of(segmentToSelectedInstanceMap, optionalSegmentToInstanceMap);
}
- private Map<String, String> selectServersUsingAdaptiveServerSelector(List<String> segments, int requestId,
- SegmentStates segmentStates, List<String> serverRankList) {
- Map<String, String> selectedServers = new HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
+ private Pair<Map<String, String>, Map<String, String>> selectServersUsingAdaptiveServerSelector(List<String> segments,
+ int requestId, SegmentStates segmentStates, List<String> serverRankList) {
+ Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
+ // No need to adjust this map per total segment numbers, as optional segments should be empty most of the time.
+ Map<String, String> optionalSegmentToInstanceMap = new HashMap<>();
for (String segment : segments) {
// NOTE: candidates can be null when there is no enabled instances for the segment, or the instance selector has
// not been updated (we update all components for routing in sequence)
@@ -151,13 +157,15 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector {
}
}
}
- // Only put online instance.
- // This can only be offline when it is a new segment.
+ // This can only be offline when it is a new segment. And such segment is marked as optional segment so that
+ // broker or server can skip it upon any issue to process it.
if (selectedInstance.isOnline()) {
- selectedServers.put(segment, selectedInstance.getInstance());
+ segmentToSelectedInstanceMap.put(segment, selectedInstance.getInstance());
+ } else {
+ optionalSegmentToInstanceMap.put(segment, selectedInstance.getInstance());
}
}
- return selectedServers;
+ return Pair.of(segmentToSelectedInstanceMap, optionalSegmentToInstanceMap);
}
private List<String> fetchCandidateServersForQuery(List<String> segments, SegmentStates segmentStates) {
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
index 8206d49452..8c352bdbe6 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
@@ -172,5 +172,9 @@ public class StrictReplicaGroupInstanceSelector extends ReplicaGroupInstanceSele
}
_newSegmentStateMap.put(segment, new NewSegmentState(newSegmentCreationTimeMap.get(segment), candidates));
}
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Got _newSegmentStateMap: {}, _oldSegmentCandidatesMap: {}", _newSegmentStateMap.keySet(),
+ _oldSegmentCandidatesMap.keySet());
+ }
}
}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index 438836a4cf..039d7a4205 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -163,7 +163,8 @@ public class HelixBrokerStarterTest extends ControllerTest {
RoutingTable routingTable = routingManager.getRoutingTable(brokerRequest, 0);
assertNotNull(routingTable);
assertEquals(routingTable.getServerInstanceToSegmentsMap().size(), NUM_SERVERS);
- assertEquals(routingTable.getServerInstanceToSegmentsMap().values().iterator().next().size(), NUM_OFFLINE_SEGMENTS);
+ assertEquals(routingTable.getServerInstanceToSegmentsMap().values().iterator().next().getLeft().size(),
+ NUM_OFFLINE_SEGMENTS);
assertTrue(routingTable.getUnavailableSegments().isEmpty());
// Add a new segment into the OFFLINE table
@@ -171,9 +172,9 @@ public class HelixBrokerStarterTest extends ControllerTest {
SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME), "downloadUrl");
TestUtils.waitForCondition(aVoid ->
- routingManager.getRoutingTable(brokerRequest, 0).getServerInstanceToSegmentsMap()
- .values().iterator().next().size() == NUM_OFFLINE_SEGMENTS + 1, 30_000L, "Failed to add the new segment "
- + "into the routing table");
+ routingManager.getRoutingTable(brokerRequest, 0).getServerInstanceToSegmentsMap().values().iterator().next()
+ .getLeft().size() == NUM_OFFLINE_SEGMENTS + 1, 30_000L,
+ "Failed to add the new segment " + "into the routing table");
// Add a new table with different broker tenant
String newRawTableName = "newTable";
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
index e38db6e020..29d2c5b428 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
@@ -28,6 +28,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.broker.broker.AllowAllAccessControlFactory;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
@@ -199,7 +200,7 @@ public class BaseBrokerRequestHandlerTest {
RoutingTable rt = mock(RoutingTable.class);
when(rt.getServerInstanceToSegmentsMap()).thenReturn(
Collections.singletonMap(new ServerInstance(new InstanceConfig("server01_9000")),
- Collections.singletonList("segment01")));
+ Pair.of(Collections.singletonList("segment01"), Collections.emptyList())));
when(routingManager.getRoutingTable(any(), Mockito.anyLong())).thenReturn(rt);
QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class);
when(queryQuotaManager.acquire(anyString())).thenReturn(true);
@@ -223,10 +224,10 @@ public class BaseBrokerRequestHandlerTest {
@Override
protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest,
- @Nullable Map<ServerInstance, List<String>> offlineRoutingTable,
+ @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable,
@Nullable BrokerRequest realtimeBrokerRequest,
- @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats,
- RequestContext requestContext)
+ @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs,
+ ServerStats serverStats, RequestContext requestContext)
throws Exception {
testRequestId[0] = requestId;
latch.await();
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
index fd2568e183..c748be4885 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
@@ -151,7 +151,7 @@ public class InstanceSelectorTest {
}
private static boolean isReplicaGroupType(String selectorType) {
- return selectorType.equals(RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE) || selectorType.equals(
+ return selectorType.equals(REPLICA_GROUP_INSTANCE_SELECTOR_TYPE) || selectorType.equals(
STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE);
}
@@ -165,7 +165,7 @@ public class InstanceSelectorTest {
@DataProvider(name = "selectorType")
public Object[] getSelectorType() {
return new Object[]{
- RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE,
+ REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE,
BALANCED_INSTANCE_SELECTOR
};
}
@@ -203,7 +203,7 @@ public class InstanceSelectorTest {
brokerMetrics) instanceof BalancedInstanceSelector);
// Replica-group instance selector should be returned
- when(routingConfig.getInstanceSelectorType()).thenReturn(RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE);
+ when(routingConfig.getInstanceSelectorType()).thenReturn(REPLICA_GROUP_INSTANCE_SELECTOR_TYPE);
assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, propertyStore,
brokerMetrics) instanceof ReplicaGroupInstanceSelector);
@@ -1414,15 +1414,15 @@ public class InstanceSelectorTest {
// First selection, we select instance0 for oldSeg and instance1 for newSeg in balance selector
// For replica group, we select instance0 for oldSeg and newSeg. Because newSeg is not online in instance0, so
// we exclude it from selection result.
- Map<String, String> expectedSelectionResult;
+ InstanceSelector.SelectionResult selectionResult =
+ selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId);
if (isReplicaGroupType(selectorType)) {
- expectedSelectionResult = ImmutableMap.of(oldSeg, instance0);
+ assertEquals(selectionResult.getSegmentToInstanceMap(), ImmutableMap.of(oldSeg, instance0));
+ assertEquals(selectionResult.getOptionalSegmentToInstanceMap(), ImmutableMap.of(newSeg, instance0));
} else {
- expectedSelectionResult = ImmutableMap.of(oldSeg, instance0, newSeg, instance1);
+ assertEquals(selectionResult.getSegmentToInstanceMap(), ImmutableMap.of(oldSeg, instance0, newSeg, instance1));
+ assertTrue(selectionResult.getOptionalSegmentToInstanceMap().isEmpty());
}
- InstanceSelector.SelectionResult selectionResult =
- selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId);
- assertEquals(selectionResult.getSegmentToInstanceMap(), expectedSelectionResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
}
{
@@ -1430,21 +1430,22 @@ public class InstanceSelectorTest {
// Second selection, we select instance1 for oldSeg and instance0 for newSeg in balance selector
// Because newSeg is not online in instance0, so we exclude it from selection result.
// For replica group, we select instance1 for oldSeg and newSeg.
- Map<String, String> expectedSelectionResult;
+ InstanceSelector.SelectionResult selectionResult =
+ selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId);
switch (selectorType) {
case BALANCED_INSTANCE_SELECTOR:
- expectedSelectionResult = ImmutableMap.of(oldSeg, instance1);
+ assertEquals(selectionResult.getSegmentToInstanceMap(), ImmutableMap.of(oldSeg, instance1));
+ assertEquals(selectionResult.getOptionalSegmentToInstanceMap(), ImmutableMap.of(newSeg, instance0));
break;
case STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE: // fall through
- case RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE:
- expectedSelectionResult = ImmutableMap.of(oldSeg, instance1, newSeg, instance1);
+ case REPLICA_GROUP_INSTANCE_SELECTOR_TYPE:
+ assertEquals(selectionResult.getSegmentToInstanceMap(),
+ ImmutableMap.of(oldSeg, instance1, newSeg, instance1));
+ assertTrue(selectionResult.getOptionalSegmentToInstanceMap().isEmpty());
break;
default:
throw new RuntimeException("unsupported selector type:" + selectorType);
}
- InstanceSelector.SelectionResult selectionResult =
- selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId);
- assertEquals(selectionResult.getSegmentToInstanceMap(), expectedSelectionResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
}
// Advance the clock to make newSeg to old segment.
@@ -1453,21 +1454,22 @@ public class InstanceSelectorTest {
selector.init(enabledInstances, idealState, externalView, onlineSegments);
{
int requestId = 0;
- Map<String, String> expectedSelectionResult;
+ InstanceSelector.SelectionResult selectionResult =
+ selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId);
switch (selectorType) {
case BALANCED_INSTANCE_SELECTOR: // fall through
- case RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE:
- expectedSelectionResult = ImmutableMap.of(oldSeg, instance0, newSeg, instance1);
+ case REPLICA_GROUP_INSTANCE_SELECTOR_TYPE:
+ assertEquals(selectionResult.getSegmentToInstanceMap(),
+ ImmutableMap.of(oldSeg, instance0, newSeg, instance1));
break;
case STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE:
- expectedSelectionResult = ImmutableMap.of(oldSeg, instance1, newSeg, instance1);
+ assertEquals(selectionResult.getSegmentToInstanceMap(),
+ ImmutableMap.of(oldSeg, instance1, newSeg, instance1));
break;
default:
throw new RuntimeException("unsupported selector type:" + selectorType);
}
- InstanceSelector.SelectionResult selectionResult =
- selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId);
- assertEquals(selectionResult.getSegmentToInstanceMap(), expectedSelectionResult);
+ assertTrue(selectionResult.getOptionalSegmentToInstanceMap().isEmpty());
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
}
{
@@ -1476,6 +1478,7 @@ public class InstanceSelectorTest {
InstanceSelector.SelectionResult selectionResult =
selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedSelectionResult);
+ assertTrue(selectionResult.getOptionalSegmentToInstanceMap().isEmpty());
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
}
}
@@ -1523,19 +1526,21 @@ public class InstanceSelectorTest {
InstanceSelector.SelectionResult selectionResult =
selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedResult);
+ assertEquals(selectionResult.getOptionalSegmentToInstanceMap(), ImmutableMap.of(newSeg, instance0));
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
// Advance the clock to make newSeg to old segment and we see newSeg is reported as unavailable segment.
_mutableClock.fastForward(Duration.ofMillis(NEW_SEGMENT_EXPIRATION_MILLIS + 10));
selector.init(enabledInstances, idealState, externalView, onlineSegments);
selectionResult = selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId);
- if (selectorType == STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE) {
+ if (STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equals(selectorType)) {
expectedResult = ImmutableMap.of();
assertEquals(selectionResult.getUnavailableSegments(), ImmutableList.of(newSeg, oldSeg));
} else {
assertEquals(selectionResult.getUnavailableSegments(), ImmutableList.of(newSeg));
}
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedResult);
+ assertTrue(selectionResult.getOptionalSegmentToInstanceMap().isEmpty());
}
@Test(dataProvider = "selectorType")
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/request/InstanceRequest.java b/pinot-common/src/main/java/org/apache/pinot/common/request/InstanceRequest.java
index 24870f77be..d266fef76b 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/request/InstanceRequest.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/request/InstanceRequest.java
@@ -25,7 +25,7 @@
package org.apache.pinot.common.request;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.15.0)", date = "2023-09-27")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.15.0)", date = "2023-11-16")
public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, InstanceRequest._Fields>, java.io.Serializable, Cloneable, Comparable<InstanceRequest> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InstanceRequest");
@@ -34,6 +34,7 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
private static final org.apache.thrift.protocol.TField SEARCH_SEGMENTS_FIELD_DESC = new org.apache.thrift.protocol.TField("searchSegments", org.apache.thrift.protocol.TType.LIST, (short)3);
private static final org.apache.thrift.protocol.TField ENABLE_TRACE_FIELD_DESC = new org.apache.thrift.protocol.TField("enableTrace", org.apache.thrift.protocol.TType.BOOL, (short)4);
private static final org.apache.thrift.protocol.TField BROKER_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("brokerId", org.apache.thrift.protocol.TType.STRING, (short)5);
+ private static final org.apache.thrift.protocol.TField OPTIONAL_SEGMENTS_FIELD_DESC = new org.apache.thrift.protocol.TField("optionalSegments", org.apache.thrift.protocol.TType.LIST, (short)6);
private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new InstanceRequestStandardSchemeFactory();
private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new InstanceRequestTupleSchemeFactory();
@@ -43,6 +44,7 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
private @org.apache.thrift.annotation.Nullable java.util.List<java.lang.String> searchSegments; // optional
private boolean enableTrace; // optional
private @org.apache.thrift.annotation.Nullable java.lang.String brokerId; // optional
+ private @org.apache.thrift.annotation.Nullable java.util.List<java.lang.String> optionalSegments; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -50,7 +52,8 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
QUERY((short)2, "query"),
SEARCH_SEGMENTS((short)3, "searchSegments"),
ENABLE_TRACE((short)4, "enableTrace"),
- BROKER_ID((short)5, "brokerId");
+ BROKER_ID((short)5, "brokerId"),
+ OPTIONAL_SEGMENTS((short)6, "optionalSegments");
private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
@@ -76,6 +79,8 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
return ENABLE_TRACE;
case 5: // BROKER_ID
return BROKER_ID;
+ case 6: // OPTIONAL_SEGMENTS
+ return OPTIONAL_SEGMENTS;
default:
return null;
}
@@ -120,7 +125,7 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
private static final int __REQUESTID_ISSET_ID = 0;
private static final int __ENABLETRACE_ISSET_ID = 1;
private byte __isset_bitfield = 0;
- private static final _Fields optionals[] = {_Fields.SEARCH_SEGMENTS,_Fields.ENABLE_TRACE,_Fields.BROKER_ID};
+ private static final _Fields optionals[] = {_Fields.SEARCH_SEGMENTS,_Fields.ENABLE_TRACE,_Fields.BROKER_ID,_Fields.OPTIONAL_SEGMENTS};
public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -135,6 +140,9 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
tmpMap.put(_Fields.BROKER_ID, new org.apache.thrift.meta_data.FieldMetaData("brokerId", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.OPTIONAL_SEGMENTS, new org.apache.thrift.meta_data.FieldMetaData("optionalSegments", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(InstanceRequest.class, metaDataMap);
}
@@ -169,6 +177,10 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
if (other.isSetBrokerId()) {
this.brokerId = other.brokerId;
}
+ if (other.isSetOptionalSegments()) {
+ java.util.List<java.lang.String> __this__optionalSegments = new java.util.ArrayList<java.lang.String>(other.optionalSegments);
+ this.optionalSegments = __this__optionalSegments;
+ }
}
public InstanceRequest deepCopy() {
@@ -184,6 +196,7 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
setEnableTraceIsSet(false);
this.enableTrace = false;
this.brokerId = null;
+ this.optionalSegments = null;
}
public long getRequestId() {
@@ -318,6 +331,46 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
}
}
+ public int getOptionalSegmentsSize() {
+ return (this.optionalSegments == null) ? 0 : this.optionalSegments.size();
+ }
+
+ @org.apache.thrift.annotation.Nullable
+ public java.util.Iterator<java.lang.String> getOptionalSegmentsIterator() {
+ return (this.optionalSegments == null) ? null : this.optionalSegments.iterator();
+ }
+
+ public void addToOptionalSegments(java.lang.String elem) {
+ if (this.optionalSegments == null) {
+ this.optionalSegments = new java.util.ArrayList<java.lang.String>();
+ }
+ this.optionalSegments.add(elem);
+ }
+
+ @org.apache.thrift.annotation.Nullable
+ public java.util.List<java.lang.String> getOptionalSegments() {
+ return this.optionalSegments;
+ }
+
+ public void setOptionalSegments(@org.apache.thrift.annotation.Nullable java.util.List<java.lang.String> optionalSegments) {
+ this.optionalSegments = optionalSegments;
+ }
+
+ public void unsetOptionalSegments() {
+ this.optionalSegments = null;
+ }
+
+ /** Returns true if field optionalSegments is set (has been assigned a value) and false otherwise */
+ public boolean isSetOptionalSegments() {
+ return this.optionalSegments != null;
+ }
+
+ public void setOptionalSegmentsIsSet(boolean value) {
+ if (!value) {
+ this.optionalSegments = null;
+ }
+ }
+
public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
switch (field) {
case REQUEST_ID:
@@ -360,6 +413,14 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
}
break;
+ case OPTIONAL_SEGMENTS:
+ if (value == null) {
+ unsetOptionalSegments();
+ } else {
+ setOptionalSegments((java.util.List<java.lang.String>)value);
+ }
+ break;
+
}
}
@@ -381,6 +442,9 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
case BROKER_ID:
return getBrokerId();
+ case OPTIONAL_SEGMENTS:
+ return getOptionalSegments();
+
}
throw new java.lang.IllegalStateException();
}
@@ -402,6 +466,8 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
return isSetEnableTrace();
case BROKER_ID:
return isSetBrokerId();
+ case OPTIONAL_SEGMENTS:
+ return isSetOptionalSegments();
}
throw new java.lang.IllegalStateException();
}
@@ -464,6 +530,15 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
return false;
}
+ boolean this_present_optionalSegments = true && this.isSetOptionalSegments();
+ boolean that_present_optionalSegments = true && that.isSetOptionalSegments();
+ if (this_present_optionalSegments || that_present_optionalSegments) {
+ if (!(this_present_optionalSegments && that_present_optionalSegments))
+ return false;
+ if (!this.optionalSegments.equals(that.optionalSegments))
+ return false;
+ }
+
return true;
}
@@ -489,6 +564,10 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
if (isSetBrokerId())
hashCode = hashCode * 8191 + brokerId.hashCode();
+ hashCode = hashCode * 8191 + ((isSetOptionalSegments()) ? 131071 : 524287);
+ if (isSetOptionalSegments())
+ hashCode = hashCode * 8191 + optionalSegments.hashCode();
+
return hashCode;
}
@@ -550,6 +629,16 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
return lastComparison;
}
}
+ lastComparison = java.lang.Boolean.compare(isSetOptionalSegments(), other.isSetOptionalSegments());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetOptionalSegments()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.optionalSegments, other.optionalSegments);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -608,6 +697,16 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
}
first = false;
}
+ if (isSetOptionalSegments()) {
+ if (!first) sb.append(", ");
+ sb.append("optionalSegments:");
+ if (this.optionalSegments == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.optionalSegments);
+ }
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -715,6 +814,24 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 6: // OPTIONAL_SEGMENTS
+ if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
+ {
+ org.apache.thrift.protocol.TList _list3 = iprot.readListBegin();
+ struct.optionalSegments = new java.util.ArrayList<java.lang.String>(_list3.size);
+ @org.apache.thrift.annotation.Nullable java.lang.String _elem4;
+ for (int _i5 = 0; _i5 < _list3.size; ++_i5)
+ {
+ _elem4 = iprot.readString();
+ struct.optionalSegments.add(_elem4);
+ }
+ iprot.readListEnd();
+ }
+ struct.setOptionalSegmentsIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -741,9 +858,9 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
oprot.writeFieldBegin(SEARCH_SEGMENTS_FIELD_DESC);
{
oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, struct.searchSegments.size()));
- for (java.lang.String _iter3 : struct.searchSegments)
+ for (java.lang.String _iter6 : struct.searchSegments)
{
- oprot.writeString(_iter3);
+ oprot.writeString(_iter6);
}
oprot.writeListEnd();
}
@@ -762,6 +879,20 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
oprot.writeFieldEnd();
}
}
+ if (struct.optionalSegments != null) {
+ if (struct.isSetOptionalSegments()) {
+ oprot.writeFieldBegin(OPTIONAL_SEGMENTS_FIELD_DESC);
+ {
+ oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, struct.optionalSegments.size()));
+ for (java.lang.String _iter7 : struct.optionalSegments)
+ {
+ oprot.writeString(_iter7);
+ }
+ oprot.writeListEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -791,13 +922,16 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
if (struct.isSetBrokerId()) {
optionals.set(2);
}
- oprot.writeBitSet(optionals, 3);
+ if (struct.isSetOptionalSegments()) {
+ optionals.set(3);
+ }
+ oprot.writeBitSet(optionals, 4);
if (struct.isSetSearchSegments()) {
{
oprot.writeI32(struct.searchSegments.size());
- for (java.lang.String _iter4 : struct.searchSegments)
+ for (java.lang.String _iter8 : struct.searchSegments)
{
- oprot.writeString(_iter4);
+ oprot.writeString(_iter8);
}
}
}
@@ -807,6 +941,15 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
if (struct.isSetBrokerId()) {
oprot.writeString(struct.brokerId);
}
+ if (struct.isSetOptionalSegments()) {
+ {
+ oprot.writeI32(struct.optionalSegments.size());
+ for (java.lang.String _iter9 : struct.optionalSegments)
+ {
+ oprot.writeString(_iter9);
+ }
+ }
+ }
}
@Override
@@ -817,16 +960,16 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
struct.query = new BrokerRequest();
struct.query.read(iprot);
struct.setQueryIsSet(true);
- java.util.BitSet incoming = iprot.readBitSet(3);
+ java.util.BitSet incoming = iprot.readBitSet(4);
if (incoming.get(0)) {
{
- org.apache.thrift.protocol.TList _list5 = iprot.readListBegin(org.apache.thrift.protocol.TType.STRING);
- struct.searchSegments = new java.util.ArrayList<java.lang.String>(_list5.size);
- @org.apache.thrift.annotation.Nullable java.lang.String _elem6;
- for (int _i7 = 0; _i7 < _list5.size; ++_i7)
+ org.apache.thrift.protocol.TList _list10 = iprot.readListBegin(org.apache.thrift.protocol.TType.STRING);
+ struct.searchSegments = new java.util.ArrayList<java.lang.String>(_list10.size);
+ @org.apache.thrift.annotation.Nullable java.lang.String _elem11;
+ for (int _i12 = 0; _i12 < _list10.size; ++_i12)
{
- _elem6 = iprot.readString();
- struct.searchSegments.add(_elem6);
+ _elem11 = iprot.readString();
+ struct.searchSegments.add(_elem11);
}
}
struct.setSearchSegmentsIsSet(true);
@@ -839,6 +982,19 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
struct.brokerId = iprot.readString();
struct.setBrokerIdIsSet(true);
}
+ if (incoming.get(3)) {
+ {
+ org.apache.thrift.protocol.TList _list13 = iprot.readListBegin(org.apache.thrift.protocol.TType.STRING);
+ struct.optionalSegments = new java.util.ArrayList<java.lang.String>(_list13.size);
+ @org.apache.thrift.annotation.Nullable java.lang.String _elem14;
+ for (int _i15 = 0; _i15 < _list13.size; ++_i15)
+ {
+ _elem14 = iprot.readString();
+ struct.optionalSegments.add(_elem14);
+ }
+ }
+ struct.setOptionalSegmentsIsSet(true);
+ }
}
}
diff --git a/pinot-common/src/thrift/request.thrift b/pinot-common/src/thrift/request.thrift
index 384bb66107..225836da54 100644
--- a/pinot-common/src/thrift/request.thrift
+++ b/pinot-common/src/thrift/request.thrift
@@ -51,4 +51,5 @@ struct InstanceRequest {
3: optional list<string> searchSegments;
4: optional bool enableTrace;
5: optional string brokerId;
+ 6: optional list<string> optionalSegments;
}
diff --git a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala
index e6d88755c5..48255233a9 100644
--- a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala
+++ b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.connector.spark.common.reader
+import org.apache.commons.lang3.tuple.Pair
import org.apache.helix.model.InstanceConfig
import org.apache.pinot.common.datatable.DataTable
import org.apache.pinot.common.metrics.BrokerMetrics
@@ -31,7 +32,7 @@ import org.apache.pinot.spi.env.PinotConfiguration
import org.apache.pinot.spi.metrics.PinotMetricUtils
import org.apache.pinot.sql.parsers.CalciteSqlCompiler
-import java.util.{List => JList, Map => JMap}
+import java.util.{Collections, List => JList, Map => JMap}
import scala.collection.JavaConverters._
/**
@@ -92,7 +93,7 @@ private[reader] class PinotServerDataFetcher(
dataTables.filter(_.getNumberOfRows > 0)
}
- private def createRoutingTableForRequest(): JMap[ServerInstance, JList[String]] = {
+ private def createRoutingTableForRequest(): JMap[ServerInstance, Pair[JList[String], JList[String]]] = {
val nullZkId: String = null
val instanceConfig = new InstanceConfig(nullZkId)
instanceConfig.setHostName(pinotSplit.serverAndSegments.serverHost)
@@ -100,15 +101,15 @@ private[reader] class PinotServerDataFetcher(
// TODO: support netty-sec
val serverInstance = new ServerInstance(instanceConfig)
Map(
- serverInstance -> pinotSplit.serverAndSegments.segments.asJava
+ serverInstance -> Pair.of(pinotSplit.serverAndSegments.segments.asJava, List[String]().asJava)
).asJava
}
private def submitRequestToPinotServer(
offlineBrokerRequest: BrokerRequest,
- offlineRoutingTable: JMap[ServerInstance, JList[String]],
+ offlineRoutingTable: JMap[ServerInstance, Pair[JList[String], JList[String]]],
realtimeBrokerRequest: BrokerRequest,
- realtimeRoutingTable: JMap[ServerInstance, JList[String]]): AsyncQueryResponse = {
+ realtimeRoutingTable: JMap[ServerInstance, Pair[JList[String], JList[String]]]): AsyncQueryResponse = {
logInfo(s"Sending request to ${pinotSplit.serverAndSegments.toString}")
queryRouter.submitQuery(
partitionId,
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 1f0e9844fe..2335a0fc33 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -311,6 +311,12 @@ public abstract class BaseTableDataManager implements TableDataManager {
@Override
public List<SegmentDataManager> acquireSegments(List<String> segmentNames, List<String> missingSegments) {
+ return acquireSegments(segmentNames, null, missingSegments);
+ }
+
+ @Override
+ public List<SegmentDataManager> acquireSegments(List<String> segmentNames,
+ @Nullable List<String> optionalSegmentNames, List<String> missingSegments) {
List<SegmentDataManager> segmentDataManagers = new ArrayList<>();
for (String segmentName : segmentNames) {
SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName);
@@ -320,6 +326,15 @@ public abstract class BaseTableDataManager implements TableDataManager {
missingSegments.add(segmentName);
}
}
+ if (optionalSegmentNames != null) {
+ for (String segmentName : optionalSegmentNames) {
+ SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName);
+ // Optional segments are not counted to missing segments that are reported back in query exception.
+ if (segmentDataManager != null && segmentDataManager.increaseReferenceCount()) {
+ segmentDataManagers.add(segmentDataManager);
+ }
+ }
+ }
return segmentDataManagers;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index ced6c62fb2..b8c5383bbb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -200,10 +200,16 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
}
List<String> segmentsToQuery = queryRequest.getSegmentsToQuery();
+ List<String> optionalSegments = queryRequest.getOptionalSegments();
List<String> notAcquiredSegments = new ArrayList<>();
List<SegmentDataManager> segmentDataManagers =
- tableDataManager.acquireSegments(segmentsToQuery, notAcquiredSegments);
+ tableDataManager.acquireSegments(segmentsToQuery, optionalSegments, notAcquiredSegments);
int numSegmentsAcquired = segmentDataManagers.size();
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Processing requestId: {} with segmentsToQuery: {}, optionalSegments: {} and acquiredSegments: {}",
+ requestId, segmentsToQuery, optionalSegments,
+ segmentDataManagers.stream().map(SegmentDataManager::getSegmentName).collect(Collectors.toList()));
+ }
List<IndexSegment> indexSegments = new ArrayList<>(numSegmentsAcquired);
for (SegmentDataManager segmentDataManager : segmentDataManagers) {
indexSegments.add(segmentDataManager.getSegment());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/logger/ServerQueryLogger.java b/pinot-core/src/main/java/org/apache/pinot/core/query/logger/ServerQueryLogger.java
index 2d72fc04ef..a9908a87b1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/logger/ServerQueryLogger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/logger/ServerQueryLogger.java
@@ -82,7 +82,7 @@ public class ServerQueryLogger {
getLongValue(responseMetadata, MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(), -1);
addToTableMeter(tableNameWithType, ServerMeter.NUM_ENTRIES_SCANNED_POST_FILTER, numEntriesScannedPostFilter);
- int numSegmentsQueried = request.getSegmentsToQuery().size();
+ long numSegmentsQueried = getLongValue(responseMetadata, MetadataKey.NUM_SEGMENTS_QUERIED.getName(), -1);
addToTableMeter(tableNameWithType, ServerMeter.NUM_SEGMENTS_QUERIED, numSegmentsQueried);
long numSegmentsProcessed = getLongValue(responseMetadata, MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), -1);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
index e444eae880..d4ce7857a5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
@@ -51,6 +51,7 @@ public class ServerQueryRequest {
private final boolean _enableTrace;
private final boolean _enableStreaming;
private final List<String> _segmentsToQuery;
+ private final List<String> _optionalSegments;
private final QueryContext _queryContext;
// Request id might not be unique across brokers or for request hitting a hybrid table. To solve that we may construct
@@ -71,6 +72,7 @@ public class ServerQueryRequest {
_enableTrace = instanceRequest.isEnableTrace();
_enableStreaming = enableStreaming;
_segmentsToQuery = instanceRequest.getSearchSegments();
+ _optionalSegments = instanceRequest.getOptionalSegments();
_queryContext = getQueryContext(instanceRequest.getQuery().getPinotQuery());
_queryId = QueryIdUtils.getQueryId(_brokerId, _requestId,
TableNameBuilder.getTableTypeFromTableName(_queryContext.getTableName()));
@@ -88,6 +90,8 @@ public class ServerQueryRequest {
_enableStreaming = Boolean.parseBoolean(metadata.get(Request.MetadataKeys.ENABLE_STREAMING));
_segmentsToQuery = serverRequest.getSegmentsList();
+ // TODO: support optional segments for GrpcQueryServer
+ _optionalSegments = null;
BrokerRequest brokerRequest;
String payloadType = metadata.getOrDefault(Request.MetadataKeys.PAYLOAD_TYPE, Request.PayloadType.SQL);
@@ -139,6 +143,10 @@ public class ServerQueryRequest {
return _segmentsToQuery;
}
+ public List<String> getOptionalSegments() {
+ return _optionalSegments;
+ }
+
public QueryContext getQueryContext() {
return _queryContext;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java
index 3491dcd821..ccc6aedb81 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java
@@ -20,22 +20,27 @@ package org.apache.pinot.core.routing;
import java.util.List;
import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.core.transport.ServerInstance;
public class RoutingTable {
- private final Map<ServerInstance, List<String>> _serverInstanceToSegmentsMap;
+ // Optional segments are those not online as in ExternalView but might have been online on servers already, e.g.
+ // the newly created consuming segments. Such segments were simply skipped by brokers at query routing time, but that
+ // had caused wrong query results, particularly for upsert tables. Instead, we should pass such segments to servers
+ // and let them decide how to handle them, e.g. skip them upon issues or include them for better query results.
+ private final Map<ServerInstance, Pair<List<String>, List<String>/*optional segments*/>> _serverInstanceToSegmentsMap;
private final List<String> _unavailableSegments;
private final int _numPrunedSegments;
- public RoutingTable(Map<ServerInstance, List<String>> serverInstanceToSegmentsMap, List<String> unavailableSegments,
- int numPrunedSegments) {
+ public RoutingTable(Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap,
+ List<String> unavailableSegments, int numPrunedSegments) {
_serverInstanceToSegmentsMap = serverInstanceToSegmentsMap;
_unavailableSegments = unavailableSegments;
_numPrunedSegments = numPrunedSegments;
}
- public Map<ServerInstance, List<String>> getServerInstanceToSegmentsMap() {
+ public Map<ServerInstance, Pair<List<String>, List<String>>> getServerInstanceToSegmentsMap() {
return _serverInstanceToSegmentsMap;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
index e0ff080e11..086211ad5f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
@@ -25,6 +25,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.config.NettyConfig;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.datatable.DataTable;
@@ -85,9 +87,10 @@ public class QueryRouter {
}
public AsyncQueryResponse submitQuery(long requestId, String rawTableName,
- @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<ServerInstance, List<String>> offlineRoutingTable,
- @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable,
- long timeoutMs) {
+ @Nullable BrokerRequest offlineBrokerRequest,
+ @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable,
+ @Nullable BrokerRequest realtimeBrokerRequest,
+ @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs) {
assert offlineBrokerRequest != null || realtimeBrokerRequest != null;
// can prefer but not require TLS until all servers guaranteed to be on TLS
@@ -97,7 +100,7 @@ public class QueryRouter {
Map<ServerRoutingInstance, InstanceRequest> requestMap = new HashMap<>();
if (offlineBrokerRequest != null) {
assert offlineRoutingTable != null;
- for (Map.Entry<ServerInstance, List<String>> entry : offlineRoutingTable.entrySet()) {
+ for (Map.Entry<ServerInstance, Pair<List<String>, List<String>>> entry : offlineRoutingTable.entrySet()) {
ServerRoutingInstance serverRoutingInstance =
entry.getKey().toServerRoutingInstance(TableType.OFFLINE, preferTls);
InstanceRequest instanceRequest = getInstanceRequest(requestId, offlineBrokerRequest, entry.getValue());
@@ -106,7 +109,7 @@ public class QueryRouter {
}
if (realtimeBrokerRequest != null) {
assert realtimeRoutingTable != null;
- for (Map.Entry<ServerInstance, List<String>> entry : realtimeRoutingTable.entrySet()) {
+ for (Map.Entry<ServerInstance, Pair<List<String>, List<String>>> entry : realtimeRoutingTable.entrySet()) {
ServerRoutingInstance serverRoutingInstance =
entry.getKey().toServerRoutingInstance(TableType.REALTIME, preferTls);
InstanceRequest instanceRequest = getInstanceRequest(requestId, realtimeBrokerRequest, entry.getValue());
@@ -195,7 +198,8 @@ public class QueryRouter {
_asyncQueryResponseMap.remove(requestId);
}
- private InstanceRequest getInstanceRequest(long requestId, BrokerRequest brokerRequest, List<String> segments) {
+ private InstanceRequest getInstanceRequest(long requestId, BrokerRequest brokerRequest,
+ Pair<List<String>, List<String>> segments) {
InstanceRequest instanceRequest = new InstanceRequest();
instanceRequest.setRequestId(requestId);
instanceRequest.setQuery(brokerRequest);
@@ -203,8 +207,14 @@ public class QueryRouter {
if (queryOptions != null) {
instanceRequest.setEnableTrace(Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE)));
}
- instanceRequest.setSearchSegments(segments);
+ instanceRequest.setSearchSegments(segments.getLeft());
instanceRequest.setBrokerId(_brokerId);
+ if (CollectionUtils.isNotEmpty(segments.getRight())) {
+ // Don't set this field, i.e. leave it as null, if there is no optional segment at all, to be more backward
+ // compatible, as there are places like in multi-stage query engine where this field is not set today when
+ // creating the InstanceRequest.
+ instanceRequest.setOptionalSegments(segments.getRight());
+ }
return instanceRequest;
}
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index 2a30eefa2f..07aa0e2c08 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.datatable.DataTable.MetadataKey;
import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -60,8 +61,8 @@ public class QueryRoutingTest {
SERVER_INSTANCE.toServerRoutingInstance(TableType.REALTIME, ServerInstance.RoutingType.NETTY);
private static final BrokerRequest BROKER_REQUEST =
CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM testTable");
- private static final Map<ServerInstance, List<String>> ROUTING_TABLE =
- Collections.singletonMap(SERVER_INSTANCE, Collections.emptyList());
+ private static final Map<ServerInstance, Pair<List<String>, List<String>>> ROUTING_TABLE =
+ Collections.singletonMap(SERVER_INSTANCE, Pair.of(Collections.emptyList(), Collections.emptyList()));
private QueryRouter _queryRouter;
private ServerRoutingStatsManager _serverRoutingStatsManager;
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 5029321162..75164add69 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -30,6 +30,7 @@ import java.util.Random;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.calcite.rel.hint.PinotHintOptions;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.core.routing.RoutingTable;
import org.apache.pinot.core.routing.TablePartitionInfo;
@@ -145,11 +146,12 @@ public class WorkerManager {
String tableType = routingEntry.getKey();
RoutingTable routingTable = routingEntry.getValue();
// for each server instance, attach all table types and their associated segment list.
- for (Map.Entry<ServerInstance, List<String>> serverEntry : routingTable.getServerInstanceToSegmentsMap()
- .entrySet()) {
- serverInstanceToSegmentsMap.putIfAbsent(serverEntry.getKey(), new HashMap<>());
- Map<String, List<String>> tableTypeToSegmentListMap = serverInstanceToSegmentsMap.get(serverEntry.getKey());
- Preconditions.checkState(tableTypeToSegmentListMap.put(tableType, serverEntry.getValue()) == null,
+ Map<ServerInstance, Pair<List<String>, List<String>>> segmentsMap = routingTable.getServerInstanceToSegmentsMap();
+ for (Map.Entry<ServerInstance, Pair<List<String>, List<String>>> serverEntry : segmentsMap.entrySet()) {
+ Map<String, List<String>> tableTypeToSegmentListMap =
+ serverInstanceToSegmentsMap.computeIfAbsent(serverEntry.getKey(), k -> new HashMap<>());
+ // TODO: support optional segments for multi-stage engine.
+ Preconditions.checkState(tableTypeToSegmentListMap.put(tableType, serverEntry.getValue().getLeft()) == null,
"Entry for server {} and table type: {} already exist!", serverEntry.getKey(), tableType);
}
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
index 41bf57700c..4539627a23 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.config.provider.TableCache;
@@ -56,7 +57,7 @@ public class MockRoutingManagerFactory {
private final Map<String, Schema> _schemaMap;
private final Set<String> _hybridTables;
private final Map<String, ServerInstance> _serverInstances;
- private final Map<String, Map<ServerInstance, List<String>>> _tableServerSegmentsMap;
+ private final Map<String, Map<ServerInstance, Pair<List<String>, List<String>>>> _tableServerSegmentsMap;
public MockRoutingManagerFactory(int... ports) {
_tableNameMap = new HashMap<>();
@@ -87,16 +88,15 @@ public class MockRoutingManagerFactory {
public void registerSegment(int insertToServerPort, String tableNameWithType, String segmentName) {
ServerInstance serverInstance = _serverInstances.get(toHostname(insertToServerPort));
_tableServerSegmentsMap.computeIfAbsent(tableNameWithType, k -> new HashMap<>())
- .computeIfAbsent(serverInstance, k -> new ArrayList<>()).add(segmentName);
+ .computeIfAbsent(serverInstance, k -> Pair.of(new ArrayList<>(), null)).getLeft().add(segmentName);
}
public RoutingManager buildRoutingManager(@Nullable Map<String, TablePartitionInfo> partitionInfoMap) {
Map<String, RoutingTable> routingTableMap = new HashMap<>();
- for (Map.Entry<String, Map<ServerInstance, List<String>>> tableEntry : _tableServerSegmentsMap.entrySet()) {
- String tableNameWithType = tableEntry.getKey();
- RoutingTable fakeRoutingTable = new RoutingTable(tableEntry.getValue(), Collections.emptyList(), 0);
+ _tableServerSegmentsMap.forEach((tableNameWithType, serverSegmentsMap) -> {
+ RoutingTable fakeRoutingTable = new RoutingTable(serverSegmentsMap, Collections.emptyList(), 0);
routingTableMap.put(tableNameWithType, fakeRoutingTable);
- }
+ });
return new FakeRoutingManager(routingTableMap, _hybridTables, partitionInfoMap, _serverInstances);
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
index c3a34cbd20..6bd3d617b3 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
@@ -46,6 +46,7 @@ import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -150,7 +151,8 @@ public class MockInstanceDataManagerFactory {
Map<String, SegmentDataManager> segmentDataManagerMap =
segmentList.stream().collect(Collectors.toMap(IndexSegment::getSegmentName, ImmutableSegmentDataManager::new));
TableDataManager tableDataManager = mock(TableDataManager.class);
- when(tableDataManager.acquireSegments(anyList(), anyList())).thenAnswer(invocation -> {
+ // TODO: support optional segments for multi-stage engine, but for now, it's always null.
+ when(tableDataManager.acquireSegments(anyList(), eq(null), anyList())).thenAnswer(invocation -> {
List<String> segments = invocation.getArgument(0);
return segments.stream().map(segmentDataManagerMap::get).collect(Collectors.toList());
});
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 2de9c586c0..0b820f50c2 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -170,6 +170,11 @@ public interface TableDataManager {
*/
List<SegmentDataManager> acquireSegments(List<String> segmentNames, List<String> missingSegments);
+ default List<SegmentDataManager> acquireSegments(List<String> segmentNames,
+ @Nullable List<String> optionalSegmentNames, List<String> missingSegments) {
+ return acquireSegments(segmentNames, missingSegments);
+ }
+
/**
* Acquires the segments with the given segment name.
* <p>It is the caller's responsibility to return the segments by calling {@link #releaseSegment(SegmentDataManager)}.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org