You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by yu...@apache.org on 2022/11/29 16:55:31 UTC
[pinot] branch master updated: [multistage] Add Multi Stage Strict Replica Group Routing Strategy (#9808)
This is an automated email from the ASF dual-hosted git repository.
yupeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e41bdd0c6f [multistage] Add Multi Stage Strict Replica Group Routing Strategy (#9808)
e41bdd0c6f is described below
commit e41bdd0c6ff851ca65102fb7bdc83161753e839c
Author: Ankit Sultana <an...@uber.com>
AuthorDate: Tue Nov 29 22:25:23 2022 +0530
[multistage] Add Multi Stage Strict Replica Group Routing Strategy (#9808)
* Rebase on master
* Cleanup
* Add tests
* Fix test
* Address feedback
* Mark feature is in Beta
* Address feedback
* Address feedback
* Rename to MultiStageReplicaGroup
---
.../broker/api/resources/PinotBrokerDebug.java | 15 +-
.../requesthandler/BaseBrokerRequestHandler.java | 4 +-
.../MultiStageBrokerRequestHandler.java | 2 +-
.../pinot/broker/routing/BrokerRoutingManager.java | 13 +-
.../instanceselector/BaseInstanceSelector.java | 10 +-
.../routing/instanceselector/InstanceSelector.java | 7 +-
.../instanceselector/InstanceSelectorFactory.java | 11 +-
.../MultiStageReplicaGroupSelector.java | 150 +++++++++++
.../broker/broker/HelixBrokerStarterTest.java | 7 +-
.../BaseBrokerRequestHandlerTest.java | 2 +-
.../instanceselector/InstanceSelectorTest.java | 277 +++++++++++++++------
.../apache/pinot/core/routing/RoutingManager.java | 2 +-
.../org/apache/pinot/query/QueryEnvironment.java | 16 +-
.../pinot/query/planner/logical/StagePlanner.java | 6 +-
.../apache/pinot/query/routing/WorkerManager.java | 16 +-
.../query/testutils/MockRoutingManagerFactory.java | 2 +-
.../pinot/spi/config/table/RoutingConfig.java | 1 +
17 files changed, 426 insertions(+), 115 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
index daa17a0e5c..f9eb55b535 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
@@ -30,6 +30,7 @@ import io.swagger.annotations.SwaggerDefinition;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@@ -59,6 +60,9 @@ import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K
// TODO: Add APIs to return the RoutingTable (with unavailable segments)
public class PinotBrokerDebug {
+ // Request ID is passed to the RoutingManager to rotate the selected replica-group.
+ private final AtomicLong _requestIdGenerator = new AtomicLong();
+
@Inject
private BrokerRoutingManager _routingManager;
@@ -102,7 +106,7 @@ public class PinotBrokerDebug {
if (tableType != TableType.REALTIME) {
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
RoutingTable routingTable = _routingManager.getRoutingTable(
- CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + offlineTableName));
+ CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + offlineTableName), getRequestId());
if (routingTable != null) {
result.put(offlineTableName, routingTable.getServerInstanceToSegmentsMap());
}
@@ -110,7 +114,7 @@ public class PinotBrokerDebug {
if (tableType != TableType.OFFLINE) {
String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
RoutingTable routingTable = _routingManager.getRoutingTable(
- CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + realtimeTableName));
+ CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + realtimeTableName), getRequestId());
if (routingTable != null) {
result.put(realtimeTableName, routingTable.getServerInstanceToSegmentsMap());
}
@@ -133,7 +137,8 @@ public class PinotBrokerDebug {
})
public Map<ServerInstance, List<String>> getRoutingTableForQuery(
@ApiParam(value = "SQL query (table name should have type suffix)") @QueryParam("query") String query) {
- RoutingTable routingTable = _routingManager.getRoutingTable(CalciteSqlCompiler.compileToBrokerRequest(query));
+ RoutingTable routingTable = _routingManager.getRoutingTable(CalciteSqlCompiler.compileToBrokerRequest(query),
+ getRequestId());
if (routingTable != null) {
return routingTable.getServerInstanceToSegmentsMap();
} else {
@@ -157,4 +162,8 @@ public class PinotBrokerDebug {
public String getServerRoutingStats() {
return _serverRoutingStatsManager.getServerRoutingStatsStr();
}
+
+ private long getRequestId() {
+ return _requestIdGenerator.getAndIncrement();
+ }
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index ade86b61bd..ae88689a51 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -551,7 +551,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
int numPrunedSegmentsTotal = 0;
if (offlineBrokerRequest != null) {
// NOTE: Routing table might be null if table is just removed
- RoutingTable routingTable = _routingManager.getRoutingTable(offlineBrokerRequest);
+ RoutingTable routingTable = _routingManager.getRoutingTable(offlineBrokerRequest, requestId);
if (routingTable != null) {
unavailableSegments.addAll(routingTable.getUnavailableSegments());
Map<ServerInstance, List<String>> serverInstanceToSegmentsMap = routingTable.getServerInstanceToSegmentsMap();
@@ -567,7 +567,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
}
if (realtimeBrokerRequest != null) {
// NOTE: Routing table might be null if table is just removed
- RoutingTable routingTable = _routingManager.getRoutingTable(realtimeBrokerRequest);
+ RoutingTable routingTable = _routingManager.getRoutingTable(realtimeBrokerRequest, requestId);
if (routingTable != null) {
unavailableSegments.addAll(routingTable.getUnavailableSegments());
Map<ServerInstance, List<String>> serverInstanceToSegmentsMap = routingTable.getServerInstanceToSegmentsMap();
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 0b1a67efbb..75e8d8d3f0 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -144,7 +144,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
return constructMultistageExplainPlan(query, plan);
case SELECT:
default:
- queryPlan = _queryEnvironment.planQuery(query, sqlNodeAndOptions);
+ queryPlan = _queryEnvironment.planQuery(query, sqlNodeAndOptions, requestId);
break;
}
} catch (Exception e) {
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index ee0982606d..883e9cfb02 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -81,7 +81,7 @@ import org.slf4j.LoggerFactory;
* <li>{@link #removeRouting(String)}: Removes the routing for a table</li>
* <li>{@link #refreshSegment(String, String)}: Refreshes the metadata for a segment</li>
* <li>{@link #routingExists(String)}: Returns whether the routing exists for a table</li>
- * <li>{@link #getRoutingTable(BrokerRequest)}: Returns the routing table for a query</li>
+ * <li>{@link #getRoutingTable(BrokerRequest, long)}: Returns the routing table for a query</li>
* <li>{@link #getTimeBoundaryInfo(String)}: Returns the time boundary info for a table</li>
* <li>{@link #getQueryTimeoutMs(String)}: Returns the table-level query timeout in milliseconds for a table</li>
* </ul>
@@ -437,7 +437,8 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle
AdaptiveServerSelector adaptiveServerSelector =
AdaptiveServerSelectorFactory.getAdaptiveServerSelector(_serverRoutingStatsManager, _pinotConfig);
InstanceSelector instanceSelector =
- InstanceSelectorFactory.getInstanceSelector(tableConfig, _brokerMetrics, adaptiveServerSelector);
+ InstanceSelectorFactory.getInstanceSelector(tableConfig, _propertyStore, _brokerMetrics,
+ adaptiveServerSelector);
instanceSelector.init(_routableServers, idealState, externalView, preSelectedOnlineSegments);
// Add time boundary manager if both offline and real-time part exist for a hybrid table
@@ -567,13 +568,13 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle
*/
@Nullable
@Override
- public RoutingTable getRoutingTable(BrokerRequest brokerRequest) {
+ public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId) {
String tableNameWithType = brokerRequest.getQuerySource().getTableName();
RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType);
if (routingEntry == null) {
return null;
}
- InstanceSelector.SelectionResult selectionResult = routingEntry.calculateRouting(brokerRequest);
+ InstanceSelector.SelectionResult selectionResult = routingEntry.calculateRouting(brokerRequest, requestId);
Map<String, String> segmentToInstanceMap = selectionResult.getSegmentToInstanceMap();
Map<ServerInstance, List<String>> serverInstanceToSegmentsMap = new HashMap<>();
for (Map.Entry<String, String> entry : segmentToInstanceMap.entrySet()) {
@@ -717,7 +718,7 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle
}
}
- InstanceSelector.SelectionResult calculateRouting(BrokerRequest brokerRequest) {
+ InstanceSelector.SelectionResult calculateRouting(BrokerRequest brokerRequest, long requestId) {
Set<String> selectedSegments = _segmentSelector.select(brokerRequest);
int numTotalSelectedSegments = selectedSegments.size();
if (!selectedSegments.isEmpty()) {
@@ -728,7 +729,7 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle
int numPrunedSegments = numTotalSelectedSegments - selectedSegments.size();
if (!selectedSegments.isEmpty()) {
InstanceSelector.SelectionResult selectionResult = _instanceSelector.select(brokerRequest,
- new ArrayList<>(selectedSegments));
+ new ArrayList<>(selectedSegments), requestId);
selectionResult.setNumPrunedSegments(numPrunedSegments);
return selectionResult;
} else {
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index 9b92d6031c..9325035aca 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
-import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
@@ -50,9 +49,8 @@ abstract class BaseInstanceSelector implements InstanceSelector {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseInstanceSelector.class);
// To prevent int overflow, reset the request id once it reaches this value
- private static final int MAX_REQUEST_ID = 1_000_000_000;
+ private static final long MAX_REQUEST_ID = 1_000_000_000;
- private final AtomicLong _requestId = new AtomicLong();
private final String _tableNameWithType;
private final BrokerMetrics _brokerMetrics;
protected final AdaptiveServerSelector _adaptiveServerSelector;
@@ -267,13 +265,13 @@ abstract class BaseInstanceSelector implements InstanceSelector {
}
@Override
- public SelectionResult select(BrokerRequest brokerRequest, List<String> segments) {
- int requestId = (int) (_requestId.getAndIncrement() % MAX_REQUEST_ID);
+ public SelectionResult select(BrokerRequest brokerRequest, List<String> segments, long requestId) {
Map<String, String> queryOptions = (brokerRequest.getPinotQuery() != null
&& brokerRequest.getPinotQuery().getQueryOptions() != null)
? brokerRequest.getPinotQuery().getQueryOptions()
: Collections.emptyMap();
- Map<String, String> segmentToInstanceMap = select(segments, requestId, _segmentToEnabledInstancesMap,
+ int requestIdInt = (int) (requestId % MAX_REQUEST_ID);
+ Map<String, String> segmentToInstanceMap = select(segments, requestIdInt, _segmentToEnabledInstancesMap,
queryOptions);
Set<String> unavailableSegments = _unavailableSegments;
if (unavailableSegments.isEmpty()) {
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
index 4edaf69b0a..4c96007fd6 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
@@ -55,8 +55,13 @@ public interface InstanceSelector {
* Selects the server instances for the given segments queried by the given broker request, returns a map from segment
* to selected server instance hosting the segment and a set of unavailable segments (no enabled instance or all
* enabled instances are in ERROR state).
+ *
+ * @param brokerRequest BrokerRequest for the query
+ * @param segments segments for which instance needs to be selected
+ * @param requestId requestId generated by the Broker for a query
+ * @return instance of SelectionResult which describes the instance to pick for a given segment
*/
- SelectionResult select(BrokerRequest brokerRequest, List<String> segments);
+ SelectionResult select(BrokerRequest brokerRequest, List<String> segments, long requestId);
class SelectionResult {
private final Map<String, String> _segmentToInstanceMap;
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
index 0ef46d7b7a..8cc9f260f8 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
@@ -19,6 +19,8 @@
package org.apache.pinot.broker.routing.instanceselector;
import javax.annotation.Nullable;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.spi.config.table.RoutingConfig;
@@ -37,7 +39,8 @@ public class InstanceSelectorFactory {
public static final String LEGACY_REPLICA_GROUP_OFFLINE_ROUTING = "PartitionAwareOffline";
public static final String LEGACY_REPLICA_GROUP_REALTIME_ROUTING = "PartitionAwareRealtime";
- public static InstanceSelector getInstanceSelector(TableConfig tableConfig, BrokerMetrics brokerMetrics,
+ public static InstanceSelector getInstanceSelector(TableConfig tableConfig,
+ ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics brokerMetrics,
@Nullable AdaptiveServerSelector adaptiveServerSelector) {
String tableNameWithType = tableConfig.getTableName();
RoutingConfig routingConfig = tableConfig.getRoutingConfig();
@@ -55,6 +58,12 @@ public class InstanceSelectorFactory {
LOGGER.info("Using StrictReplicaGroupInstanceSelector for table: {}", tableNameWithType);
return new StrictReplicaGroupInstanceSelector(tableNameWithType, brokerMetrics, adaptiveServerSelector);
}
+ if (RoutingConfig.MULTI_STAGE_REPLICA_GROUP_SELECTOR_TYPE.equalsIgnoreCase(
+ routingConfig.getInstanceSelectorType())) {
+ LOGGER.info("Using {} for table: {}", routingConfig.getInstanceSelectorType(), tableNameWithType);
+ return new MultiStageReplicaGroupSelector(tableNameWithType, propertyStore, brokerMetrics,
+ adaptiveServerSelector);
+ }
}
return new BalancedInstanceSelector(tableNameWithType, brokerMetrics, adaptiveServerSelector);
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
new file mode 100644
index 0000000000..0a6d66510c
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.instanceselector;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Instance selector for multi-stage queries which can ensure that Colocated Tables always leverage Colocated Join
+ * whenever possible. To achieve this, this instance-selector uses InstancePartitions (IP) to determine replica-groups,
+ * as opposed to IdealState used by other instance-selectors. Moreover, this also uses the requestId generated by
+ * Pinot broker to determine the replica-group picked for each table involved in the query, as opposed to using a
+ * member variable. There may be scenarios where an instance in the chosen replica-group is down. In that case, this
+ * strategy will try to pick another replica-group. For realtime tables, this strategy uses only CONSUMING partitions.
+ * This is feature is in <strong>Beta</strong>.
+ */
+public class MultiStageReplicaGroupSelector extends BaseInstanceSelector {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MultiStageReplicaGroupSelector.class);
+
+ private final String _tableNameWithType;
+ private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+ private InstancePartitions _instancePartitions;
+
+ public MultiStageReplicaGroupSelector(String tableNameWithType, ZkHelixPropertyStore<ZNRecord> propertyStore,
+ BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector adaptiveServerSelector) {
+ super(tableNameWithType, brokerMetrics, adaptiveServerSelector);
+ _tableNameWithType = tableNameWithType;
+ _propertyStore = propertyStore;
+ }
+
+ @Override
+ public void init(Set<String> enabledInstances, IdealState idealState, ExternalView externalView,
+ Set<String> onlineSegments) {
+ super.init(enabledInstances, idealState, externalView, onlineSegments);
+ _instancePartitions = getInstancePartitions();
+ }
+
+ @Override
+ public void onInstancesChange(Set<String> enabledInstances, List<String> changedInstances) {
+ super.onInstancesChange(enabledInstances, changedInstances);
+ _instancePartitions = getInstancePartitions();
+ }
+
+ @Override
+ public void onAssignmentChange(IdealState idealState, ExternalView externalView, Set<String> onlineSegments) {
+ super.onAssignmentChange(idealState, externalView, onlineSegments);
+ _instancePartitions = getInstancePartitions();
+ }
+
+ @Override
+ Map<String, String> select(List<String> segments, int requestId,
+ Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions) {
+ // Create a copy of InstancePartitions to avoid race-condition with event-listeners above.
+ InstancePartitions instancePartitions = _instancePartitions;
+ int replicaGroupSelected = requestId % instancePartitions.getNumReplicaGroups();
+ for (int iteration = 0; iteration < instancePartitions.getNumReplicaGroups(); iteration++) {
+ int replicaGroup = (replicaGroupSelected + iteration) % instancePartitions.getNumReplicaGroups();
+ try {
+ return tryAssigning(segmentToEnabledInstancesMap, instancePartitions, replicaGroup);
+ } catch (Exception e) {
+ LOGGER.warn("Unable to select replica-group {} for table: {}", replicaGroup, _tableNameWithType, e);
+ }
+ }
+ throw new RuntimeException(String.format("Unable to find any replica-group to serve table: %s",
+ _tableNameWithType));
+ }
+
+ /**
+ * Returns a map from the segmentName to the corresponding server in the given replica-group. If the is not enabled,
+ * we throw an exception.
+ */
+ private Map<String, String> tryAssigning(Map<String, List<String>> segmentToEnabledInstancesMap,
+ InstancePartitions instancePartitions, int replicaId) {
+ Set<String> instanceLookUpSet = new HashSet<>();
+ for (int partition = 0; partition < instancePartitions.getNumPartitions(); partition++) {
+ List<String> instances = instancePartitions.getInstances(partition, replicaId);
+ instanceLookUpSet.addAll(instances);
+ }
+ Map<String, String> result = new HashMap<>();
+ for (Map.Entry<String, List<String>> entry : segmentToEnabledInstancesMap.entrySet()) {
+ String segmentName = entry.getKey();
+ boolean found = false;
+ for (String enabledInstanceForSegment : entry.getValue()) {
+ if (instanceLookUpSet.contains(enabledInstanceForSegment)) {
+ found = true;
+ result.put(segmentName, enabledInstanceForSegment);
+ break;
+ }
+ }
+ if (!found) {
+ throw new RuntimeException(String.format("Unable to find an enabled instance for segment: %s", segmentName));
+ }
+ }
+ return result;
+ }
+
+ @VisibleForTesting
+ protected InstancePartitions getInstancePartitions() {
+ // TODO: Evaluate whether we need to provide support for COMPLETE partitions.
+ TableType tableType = TableNameBuilder.getTableTypeFromTableName(_tableNameWithType);
+ Preconditions.checkNotNull(tableType);
+ InstancePartitions instancePartitions = null;
+ if (tableType.equals(TableType.OFFLINE)) {
+ instancePartitions = InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
+ InstancePartitionsUtils.getInstancePartitionsName(_tableNameWithType, tableType.name()));
+ } else {
+ instancePartitions = InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
+ InstancePartitionsUtils.getInstancePartitionsName(_tableNameWithType,
+ InstancePartitionsType.CONSUMING.name()));
+ }
+ Preconditions.checkNotNull(instancePartitions);
+ return instancePartitions;
+ }
+}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index b2abbd59a9..53e75d0557 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -153,7 +153,7 @@ public class HelixBrokerStarterTest extends ControllerTest {
assertTrue(routingManager.routingExists(REALTIME_TABLE_NAME));
BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + OFFLINE_TABLE_NAME);
- RoutingTable routingTable = routingManager.getRoutingTable(brokerRequest);
+ RoutingTable routingTable = routingManager.getRoutingTable(brokerRequest, 0);
assertNotNull(routingTable);
assertEquals(routingTable.getServerInstanceToSegmentsMap().size(), NUM_SERVERS);
assertEquals(routingTable.getServerInstanceToSegmentsMap().values().iterator().next().size(), NUM_OFFLINE_SEGMENTS);
@@ -164,8 +164,9 @@ public class HelixBrokerStarterTest extends ControllerTest {
SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME), "downloadUrl");
TestUtils.waitForCondition(aVoid ->
- routingManager.getRoutingTable(brokerRequest).getServerInstanceToSegmentsMap().values().iterator().next().size()
- == NUM_OFFLINE_SEGMENTS + 1, 30_000L, "Failed to add the new segment into the routing table");
+ routingManager.getRoutingTable(brokerRequest, 0).getServerInstanceToSegmentsMap()
+ .values().iterator().next().size() == NUM_OFFLINE_SEGMENTS + 1, 30_000L, "Failed to add the new segment "
+ + "into the routing table");
// Add a new table with different broker tenant
String newRawTableName = "newTable";
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
index cff4eff193..88f72600e5 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
@@ -199,7 +199,7 @@ public class BaseBrokerRequestHandlerTest {
RoutingTable rt = mock(RoutingTable.class);
when(rt.getServerInstanceToSegmentsMap()).thenReturn(Collections
.singletonMap(new ServerInstance(new InstanceConfig("server01_9000")), Collections.singletonList("segment01")));
- when(routingManager.getRoutingTable(any())).thenReturn(rt);
+ when(routingManager.getRoutingTable(any(), Mockito.anyLong())).thenReturn(rt);
QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class);
when(queryQuotaManager.acquire(anyString())).thenReturn(true);
CountDownLatch latch = new CountDownLatch(1);
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
index 5025782a49..07efbe101a 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
@@ -18,6 +18,9 @@
*/
package org.apache.pinot.broker.routing.instanceselector;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -28,7 +31,10 @@ import java.util.Set;
import java.util.TreeMap;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
+import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.PinotQuery;
@@ -41,10 +47,13 @@ import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.Segmen
import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ERROR;
import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE;
import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
public class InstanceSelectorTest {
@@ -55,26 +64,27 @@ public class InstanceSelectorTest {
public void testInstanceSelectorFactory() {
TableConfig tableConfig = mock(TableConfig.class);
BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
+ ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class);
AdaptiveServerSelector adaptiveServerSelector = null;
// Routing config is missing
- assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, brokerMetrics,
+ assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, propertyStore, brokerMetrics,
adaptiveServerSelector) instanceof BalancedInstanceSelector);
// Instance selector type is not configured
RoutingConfig routingConfig = mock(RoutingConfig.class);
when(tableConfig.getRoutingConfig()).thenReturn(routingConfig);
- assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, brokerMetrics,
+ assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, propertyStore, brokerMetrics,
adaptiveServerSelector) instanceof BalancedInstanceSelector);
// Replica-group instance selector should be returned
when(routingConfig.getInstanceSelectorType()).thenReturn(RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE);
- assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, brokerMetrics,
+ assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, propertyStore, brokerMetrics,
adaptiveServerSelector) instanceof ReplicaGroupInstanceSelector);
// Strict replica-group instance selector should be returned
when(routingConfig.getInstanceSelectorType()).thenReturn(RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE);
- assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, brokerMetrics,
+ assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, propertyStore, brokerMetrics,
adaptiveServerSelector) instanceof StrictReplicaGroupInstanceSelector);
// Should be backward-compatible with legacy config
@@ -82,12 +92,12 @@ public class InstanceSelectorTest {
when(tableConfig.getTableType()).thenReturn(TableType.OFFLINE);
when(routingConfig.getRoutingTableBuilderName()).thenReturn(
InstanceSelectorFactory.LEGACY_REPLICA_GROUP_OFFLINE_ROUTING);
- assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, brokerMetrics,
+ assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, propertyStore, brokerMetrics,
adaptiveServerSelector) instanceof ReplicaGroupInstanceSelector);
when(tableConfig.getTableType()).thenReturn(TableType.REALTIME);
when(routingConfig.getRoutingTableBuilderName()).thenReturn(
InstanceSelectorFactory.LEGACY_REPLICA_GROUP_REALTIME_ROUTING);
- assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, brokerMetrics,
+ assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, propertyStore, brokerMetrics,
adaptiveServerSelector) instanceof ReplicaGroupInstanceSelector);
}
@@ -169,6 +179,8 @@ public class InstanceSelectorTest {
replicaGroupInstanceSelector.init(enabledInstances, idealState, externalView, onlineSegments);
strictReplicaGroupInstanceSelector.init(enabledInstances, idealState, externalView, onlineSegments);
+ int requestId = 0;
+
// For the 1st request:
// BalancedInstanceSelector:
// segment0 -> instance0
@@ -189,7 +201,8 @@ public class InstanceSelectorTest {
expectedBalancedInstanceSelectorResult.put(segment1, instance2);
expectedBalancedInstanceSelectorResult.put(segment2, instance1);
expectedBalancedInstanceSelectorResult.put(segment3, instance3);
- InstanceSelector.SelectionResult selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ InstanceSelector.SelectionResult selectionResult = balancedInstanceSelector.select(brokerRequest, segments,
+ requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
Map<String, String> expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -197,10 +210,10 @@ public class InstanceSelectorTest {
expectedReplicaGroupInstanceSelectorResult.put(segment1, instance0);
expectedReplicaGroupInstanceSelectorResult.put(segment2, instance1);
expectedReplicaGroupInstanceSelectorResult.put(segment3, instance1);
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -215,12 +228,13 @@ public class InstanceSelectorTest {
// segment1 -> instance2
// segment2 -> instance3
// segment3 -> instance3
+ requestId++;
expectedBalancedInstanceSelectorResult = new HashMap<>();
expectedBalancedInstanceSelectorResult.put(segment0, instance2);
expectedBalancedInstanceSelectorResult.put(segment1, instance0);
expectedBalancedInstanceSelectorResult.put(segment2, instance3);
expectedBalancedInstanceSelectorResult.put(segment3, instance1);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -228,10 +242,10 @@ public class InstanceSelectorTest {
expectedReplicaGroupInstanceSelectorResult.put(segment1, instance2);
expectedReplicaGroupInstanceSelectorResult.put(segment2, instance3);
expectedReplicaGroupInstanceSelectorResult.put(segment3, instance3);
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -252,12 +266,13 @@ public class InstanceSelectorTest {
// segment1 -> instance2
// segment2 -> instance1
// segment3 -> instance1
+ requestId++;
expectedBalancedInstanceSelectorResult = new HashMap<>();
expectedBalancedInstanceSelectorResult.put(segment0, instance2);
expectedBalancedInstanceSelectorResult.put(segment1, instance2);
expectedBalancedInstanceSelectorResult.put(segment2, instance1);
expectedBalancedInstanceSelectorResult.put(segment3, instance3);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -265,10 +280,10 @@ public class InstanceSelectorTest {
expectedReplicaGroupInstanceSelectorResult.put(segment1, instance2);
expectedReplicaGroupInstanceSelectorResult.put(segment2, instance1);
expectedReplicaGroupInstanceSelectorResult.put(segment3, instance1);
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -283,12 +298,13 @@ public class InstanceSelectorTest {
// segment1 -> instance2
// segment2 -> instance3
// segment3 -> instance3
+ requestId++;
expectedBalancedInstanceSelectorResult = new HashMap<>();
expectedBalancedInstanceSelectorResult.put(segment0, instance2);
expectedBalancedInstanceSelectorResult.put(segment1, instance2);
expectedBalancedInstanceSelectorResult.put(segment2, instance3);
expectedBalancedInstanceSelectorResult.put(segment3, instance1);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -296,10 +312,10 @@ public class InstanceSelectorTest {
expectedReplicaGroupInstanceSelectorResult.put(segment1, instance2);
expectedReplicaGroupInstanceSelectorResult.put(segment2, instance3);
expectedReplicaGroupInstanceSelectorResult.put(segment3, instance3);
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -326,21 +342,22 @@ public class InstanceSelectorTest {
// segment2 -> instance1
// segment3 -> instance1
// segment4 -> null
+ requestId++;
expectedBalancedInstanceSelectorResult = new HashMap<>();
expectedBalancedInstanceSelectorResult.put(segment1, instance2);
expectedBalancedInstanceSelectorResult.put(segment2, instance3);
expectedBalancedInstanceSelectorResult.put(segment3, instance1);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
expectedReplicaGroupInstanceSelectorResult.put(segment1, instance2);
expectedReplicaGroupInstanceSelectorResult.put(segment2, instance1);
expectedReplicaGroupInstanceSelectorResult.put(segment3, instance1);
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -355,21 +372,22 @@ public class InstanceSelectorTest {
// segment2 -> instance3
// segment3 -> instance3
// segment4 -> null
+ requestId++;
expectedBalancedInstanceSelectorResult = new HashMap<>();
expectedBalancedInstanceSelectorResult.put(segment1, instance2);
expectedBalancedInstanceSelectorResult.put(segment2, instance1);
expectedBalancedInstanceSelectorResult.put(segment3, instance3);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
expectedReplicaGroupInstanceSelectorResult.put(segment1, instance2);
expectedReplicaGroupInstanceSelectorResult.put(segment2, instance3);
expectedReplicaGroupInstanceSelectorResult.put(segment3, instance3);
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -389,12 +407,13 @@ public class InstanceSelectorTest {
// segment2 -> instance1
// segment3 -> instance1
// segment4 -> instance2
+ requestId++;
expectedBalancedInstanceSelectorResult = new HashMap<>();
expectedBalancedInstanceSelectorResult.put(segment1, instance2);
expectedBalancedInstanceSelectorResult.put(segment2, instance3);
expectedBalancedInstanceSelectorResult.put(segment3, instance1);
expectedBalancedInstanceSelectorResult.put(segment4, instance2);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -402,10 +421,10 @@ public class InstanceSelectorTest {
expectedReplicaGroupInstanceSelectorResult.put(segment2, instance1);
expectedReplicaGroupInstanceSelectorResult.put(segment3, instance1);
expectedReplicaGroupInstanceSelectorResult.put(segment4, instance2);
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -420,12 +439,13 @@ public class InstanceSelectorTest {
// segment2 -> instance3
// segment3 -> instance3
// segment4 -> instance2
+ requestId++;
expectedBalancedInstanceSelectorResult = new HashMap<>();
expectedBalancedInstanceSelectorResult.put(segment1, instance2);
expectedBalancedInstanceSelectorResult.put(segment2, instance1);
expectedBalancedInstanceSelectorResult.put(segment3, instance3);
expectedBalancedInstanceSelectorResult.put(segment4, instance2);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -433,10 +453,10 @@ public class InstanceSelectorTest {
expectedReplicaGroupInstanceSelectorResult.put(segment2, instance3);
expectedReplicaGroupInstanceSelectorResult.put(segment3, instance3);
expectedReplicaGroupInstanceSelectorResult.put(segment4, instance2);
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -457,12 +477,13 @@ public class InstanceSelectorTest {
// segment2 -> instance1
// segment3 -> instance1
// segment4 -> instance0
+ requestId++;
expectedBalancedInstanceSelectorResult = new HashMap<>();
expectedBalancedInstanceSelectorResult.put(segment1, instance0);
expectedBalancedInstanceSelectorResult.put(segment2, instance3);
expectedBalancedInstanceSelectorResult.put(segment3, instance1);
expectedBalancedInstanceSelectorResult.put(segment4, instance2);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -470,10 +491,10 @@ public class InstanceSelectorTest {
expectedReplicaGroupInstanceSelectorResult.put(segment2, instance1);
expectedReplicaGroupInstanceSelectorResult.put(segment3, instance1);
expectedReplicaGroupInstanceSelectorResult.put(segment4, instance0);
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -488,12 +509,13 @@ public class InstanceSelectorTest {
// segment2 -> instance3
// segment3 -> instance3
// segment4 -> instance2
+ requestId++;
expectedBalancedInstanceSelectorResult = new HashMap<>();
expectedBalancedInstanceSelectorResult.put(segment1, instance2);
expectedBalancedInstanceSelectorResult.put(segment2, instance1);
expectedBalancedInstanceSelectorResult.put(segment3, instance3);
expectedBalancedInstanceSelectorResult.put(segment4, instance0);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -501,10 +523,10 @@ public class InstanceSelectorTest {
expectedReplicaGroupInstanceSelectorResult.put(segment2, instance3);
expectedReplicaGroupInstanceSelectorResult.put(segment3, instance3);
expectedReplicaGroupInstanceSelectorResult.put(segment4, instance2);
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -534,12 +556,13 @@ public class InstanceSelectorTest {
// segment2 -> instance1
// segment3 -> instance1
// segment4 -> instance2
+ requestId++;
expectedBalancedInstanceSelectorResult = new HashMap<>();
expectedBalancedInstanceSelectorResult.put(segment1, instance2);
expectedBalancedInstanceSelectorResult.put(segment2, instance3);
expectedBalancedInstanceSelectorResult.put(segment3, instance1);
expectedBalancedInstanceSelectorResult.put(segment4, instance2);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -547,7 +570,7 @@ public class InstanceSelectorTest {
expectedReplicaGroupInstanceSelectorResult.put(segment2, instance1);
expectedReplicaGroupInstanceSelectorResult.put(segment3, instance1);
expectedReplicaGroupInstanceSelectorResult.put(segment4, instance0);
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
Map<String, String> expectedStrictReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -555,7 +578,7 @@ public class InstanceSelectorTest {
expectedStrictReplicaGroupInstanceSelectorResult.put(segment2, instance1);
expectedStrictReplicaGroupInstanceSelectorResult.put(segment3, instance1);
expectedStrictReplicaGroupInstanceSelectorResult.put(segment4, instance2);
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedStrictReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -570,12 +593,13 @@ public class InstanceSelectorTest {
// segment2 -> instance3
// segment3 -> instance3
// segment4 -> instance2
+ requestId++;
expectedBalancedInstanceSelectorResult = new HashMap<>();
expectedBalancedInstanceSelectorResult.put(segment1, instance2);
expectedBalancedInstanceSelectorResult.put(segment2, instance1);
expectedBalancedInstanceSelectorResult.put(segment3, instance3);
expectedBalancedInstanceSelectorResult.put(segment4, instance0);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedBalancedInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
@@ -583,10 +607,10 @@ public class InstanceSelectorTest {
expectedReplicaGroupInstanceSelectorResult.put(segment2, instance3);
expectedReplicaGroupInstanceSelectorResult.put(segment3, instance3);
expectedReplicaGroupInstanceSelectorResult.put(segment4, instance2);
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments, requestId);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
}
@@ -668,12 +692,11 @@ public class InstanceSelectorTest {
expectedReplicaGroupInstanceSelectorResult.put(segments.get(9), instance1);
expectedReplicaGroupInstanceSelectorResult.put(segments.get(10), instance0);
expectedReplicaGroupInstanceSelectorResult.put(segments.get(11), instance1);
- InstanceSelector.SelectionResult selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
+ InstanceSelector.SelectionResult selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
}
-
@Test
public void testReplicaGroupInstanceSelectorNumReplicaGroupsToQueryGreaterThanReplicas() {
String offlineTableName = "testTable_OFFLINE";
@@ -752,7 +775,7 @@ public class InstanceSelectorTest {
expectedReplicaGroupInstanceSelectorResult.put(segments.get(9), instance0);
expectedReplicaGroupInstanceSelectorResult.put(segments.get(10), instance1);
expectedReplicaGroupInstanceSelectorResult.put(segments.get(11), instance2);
- InstanceSelector.SelectionResult selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
+ InstanceSelector.SelectionResult selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
}
@@ -814,14 +837,126 @@ public class InstanceSelectorTest {
for (String segment: segments) {
expectedReplicaGroupInstanceSelectorResult.put(segment, instance0);
}
- InstanceSelector.SelectionResult selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
+ InstanceSelector.SelectionResult selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
for (String segment: segments) {
expectedReplicaGroupInstanceSelectorResult.put(segment, instance1);
}
- selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments, 1);
+ assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
+ }
+
+ @Test
+ public void testMultiStageStrictReplicaGroupSelector() {
+ String offlineTableName = "testTable_OFFLINE";
+ // Create instance-partitions with two replica-groups and 1 partition. Each replica-group has 2 instances.
+ List<String> replicaGroup0 = ImmutableList.of("instance-0", "instance-1");
+ List<String> replicaGroup1 = ImmutableList.of("instance-2", "instance-3");
+ Map<String, List<String>> partitionToInstances = ImmutableMap.of(
+ "0_0", replicaGroup0,
+ "0_1", replicaGroup1);
+ InstancePartitions instancePartitions = new InstancePartitions(offlineTableName);
+ instancePartitions.setInstances(0, 0, partitionToInstances.get("0_0"));
+ instancePartitions.setInstances(0, 1, partitionToInstances.get("0_1"));
+ BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
+ BrokerRequest brokerRequest = mock(BrokerRequest.class);
+ PinotQuery pinotQuery = mock(PinotQuery.class);
+ Map<String, String> queryOptions = new HashMap<>();
+
+ when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery);
+ when(pinotQuery.getQueryOptions()).thenReturn(queryOptions);
+
+ ZkHelixPropertyStore<ZNRecord> propertyStore = (ZkHelixPropertyStore<ZNRecord>) mock(ZkHelixPropertyStore.class);
+
+ MultiStageReplicaGroupSelector multiStageSelector =
+ new MultiStageReplicaGroupSelector(offlineTableName, propertyStore, brokerMetrics, null);
+ multiStageSelector = spy(multiStageSelector);
+ doReturn(instancePartitions).when(multiStageSelector).getInstancePartitions();
+
+ List<String> enabledInstances = new ArrayList<>();
+ IdealState idealState = new IdealState(offlineTableName);
+ Map<String, Map<String, String>> idealStateSegmentAssignment = idealState.getRecord().getMapFields();
+ ExternalView externalView = new ExternalView(offlineTableName);
+ Map<String, Map<String, String>> externalViewSegmentAssignment = externalView.getRecord().getMapFields();
+ Set<String> onlineSegments = new HashSet<>();
+
+ // Mark all instances as enabled
+ for (int i = 0; i < 4; i++) {
+ enabledInstances.add(String.format("instance-%d", i));
+ }
+
+ List<String> segments = getSegments();
+
+ // Create two idealState and externalView maps. One is used for segments with replica-group=0 and the other for rg=1
+ Map<String, String> idealStateInstanceStateMap0 = new TreeMap<>();
+ Map<String, String> externalViewInstanceStateMap0 = new TreeMap<>();
+ Map<String, String> idealStateInstanceStateMap1 = new TreeMap<>();
+ Map<String, String> externalViewInstanceStateMap1 = new TreeMap<>();
+
+ // instance-0 and instance-2 mirror each other in the two replica-groups. Same for instance-1 and instance-3.
+ for (int i = 0; i < 4; i++) {
+ String instance = enabledInstances.get(i);
+ if (i % 2 == 0) {
+ idealStateInstanceStateMap0.put(instance, ONLINE);
+ externalViewInstanceStateMap0.put(instance, ONLINE);
+ } else {
+ idealStateInstanceStateMap1.put(instance, ONLINE);
+ externalViewInstanceStateMap1.put(instance, ONLINE);
+ }
+ }
+
+ // Even numbered segments get assigned to [instance-0, instance-2], and odd numbered segments get assigned to
+ // [instance-1,instance-3].
+ for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) {
+ String segment = segments.get(segmentNum);
+ if (segmentNum % 2 == 0) {
+ idealStateSegmentAssignment.put(segment, idealStateInstanceStateMap0);
+ externalViewSegmentAssignment.put(segment, externalViewInstanceStateMap0);
+ } else {
+ idealStateSegmentAssignment.put(segment, idealStateInstanceStateMap1);
+ externalViewSegmentAssignment.put(segment, externalViewInstanceStateMap1);
+ }
+ onlineSegments.add(segment);
+ }
+
+ multiStageSelector.init(new HashSet<>(enabledInstances), idealState, externalView, onlineSegments);
+
+ // Using requestId=0 should select replica-group 0. Even segments get assigned to instance-0 and odd segments get
+ // assigned to instance-1.
+ Map<String, String> expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
+ for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) {
+ expectedReplicaGroupInstanceSelectorResult.put(segments.get(segmentNum), replicaGroup0.get(segmentNum % 2));
+ }
+ InstanceSelector.SelectionResult selectionResult = multiStageSelector.select(brokerRequest, segments, 0);
+ assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
+
+ // Using same requestId again should return the same selection
+ selectionResult = multiStageSelector.select(brokerRequest, segments, 0);
+ assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
+
+ // Using requestId=1 should select replica-group 1
+ expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
+ for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) {
+ expectedReplicaGroupInstanceSelectorResult.put(segments.get(segmentNum), replicaGroup1.get(segmentNum % 2));
+ }
+ selectionResult = multiStageSelector.select(brokerRequest, segments, 1);
+ assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
+
+ // If instance-0 is down, replica-group 1 should be picked even with requestId=0
+ enabledInstances.remove("instance-0");
+ multiStageSelector.init(new HashSet<>(enabledInstances), idealState, externalView, onlineSegments);
+ selectionResult = multiStageSelector.select(brokerRequest, segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
+
+ // If instance-2 also goes down, no replica-group is eligible
+ enabledInstances.remove("instance-2");
+ multiStageSelector.init(new HashSet<>(enabledInstances), idealState, externalView, onlineSegments);
+ try {
+ multiStageSelector.select(brokerRequest, segments, 0);
+ fail("Method call above should have failed");
+ } catch (Exception ignored) {
+ }
}
@Test
@@ -880,10 +1015,10 @@ public class InstanceSelectorTest {
PinotQuery pinotQuery = mock(PinotQuery.class);
when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery);
when(pinotQuery.getQueryOptions()).thenReturn(null);
- InstanceSelector.SelectionResult selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ InstanceSelector.SelectionResult selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(), Arrays.asList(segment0, segment1));
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(), Arrays.asList(segment0, segment1));
@@ -904,10 +1039,10 @@ public class InstanceSelectorTest {
enabledInstances.add(errorInstance);
balancedInstanceSelector.onInstancesChange(enabledInstances, Collections.singletonList(errorInstance));
strictReplicaGroupInstanceSelector.onInstancesChange(enabledInstances, Collections.singletonList(errorInstance));
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(), Arrays.asList(segment0, segment1));
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(), Arrays.asList(segment0, segment1));
@@ -925,10 +1060,10 @@ public class InstanceSelectorTest {
enabledInstances.add(instance);
balancedInstanceSelector.onInstancesChange(enabledInstances, Collections.singletonList(instance));
strictReplicaGroupInstanceSelector.onInstancesChange(enabledInstances, Collections.singletonList(instance));
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -936,10 +1071,10 @@ public class InstanceSelectorTest {
idealStateInstanceStateMap.put(instance, ONLINE);
balancedInstanceSelector.onAssignmentChange(idealState, externalView, onlineSegments);
strictReplicaGroupInstanceSelector.onAssignmentChange(idealState, externalView, onlineSegments);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -958,10 +1093,10 @@ public class InstanceSelectorTest {
externalViewInstanceStateMap1.put(instance, ONLINE);
balancedInstanceSelector.onAssignmentChange(idealState, externalView, onlineSegments);
strictReplicaGroupInstanceSelector.onAssignmentChange(idealState, externalView, onlineSegments);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -969,10 +1104,10 @@ public class InstanceSelectorTest {
idealStateInstanceStateMap.remove(instance);
balancedInstanceSelector.onAssignmentChange(idealState, externalView, onlineSegments);
strictReplicaGroupInstanceSelector.onAssignmentChange(idealState, externalView, onlineSegments);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(), Arrays.asList(segment0, segment1));
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(), Arrays.asList(segment0, segment1));
@@ -993,10 +1128,10 @@ public class InstanceSelectorTest {
externalViewInstanceStateMap1.put(errorInstance, ONLINE);
balancedInstanceSelector.onAssignmentChange(idealState, externalView, onlineSegments);
strictReplicaGroupInstanceSelector.onAssignmentChange(idealState, externalView, onlineSegments);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(), Arrays.asList(segment0, segment1));
@@ -1017,10 +1152,10 @@ public class InstanceSelectorTest {
externalViewInstanceStateMap1.put(errorInstance, ERROR);
balancedInstanceSelector.onAssignmentChange(idealState, externalView, onlineSegments);
strictReplicaGroupInstanceSelector.onAssignmentChange(idealState, externalView, onlineSegments);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
@@ -1038,10 +1173,10 @@ public class InstanceSelectorTest {
enabledInstances.remove(instance);
balancedInstanceSelector.onInstancesChange(enabledInstances, Collections.singletonList(instance));
strictReplicaGroupInstanceSelector.onInstancesChange(enabledInstances, Collections.singletonList(instance));
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(), Arrays.asList(segment0, segment1));
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(), Arrays.asList(segment0, segment1));
@@ -1061,10 +1196,10 @@ public class InstanceSelectorTest {
externalViewInstanceStateMap0.put(errorInstance, ONLINE);
balancedInstanceSelector.onAssignmentChange(idealState, externalView, onlineSegments);
strictReplicaGroupInstanceSelector.onAssignmentChange(idealState, externalView, onlineSegments);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap().size(), 1);
assertEquals(selectionResult.getUnavailableSegments(), Collections.singletonList(segment1));
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertEquals(selectionResult.getSegmentToInstanceMap().size(), 1);
assertEquals(selectionResult.getUnavailableSegments(), Collections.singletonList(segment1));
@@ -1082,10 +1217,10 @@ public class InstanceSelectorTest {
enabledInstances.remove(errorInstance);
balancedInstanceSelector.onInstancesChange(enabledInstances, Collections.singletonList(errorInstance));
strictReplicaGroupInstanceSelector.onInstancesChange(enabledInstances, Collections.singletonList(errorInstance));
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(), Arrays.asList(segment0, segment1));
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(), Arrays.asList(segment0, segment1));
@@ -1106,10 +1241,10 @@ public class InstanceSelectorTest {
externalViewInstanceStateMap1.put(instance, CONSUMING);
balancedInstanceSelector.onAssignmentChange(idealState, externalView, onlineSegments);
strictReplicaGroupInstanceSelector.onAssignmentChange(idealState, externalView, onlineSegments);
- selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+ selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(), Arrays.asList(segment0, segment1));
- selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+ selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments, 0);
assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
assertEquals(selectionResult.getUnavailableSegments(), Arrays.asList(segment0, segment1));
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
index db535dcaa6..857f0207da 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
@@ -50,7 +50,7 @@ public interface RoutingManager {
* @param brokerRequest the broker request constructed from a query.
* @return the route table.
*/
- RoutingTable getRoutingTable(BrokerRequest brokerRequest);
+ RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId);
/**
* Validate routing exist for a table
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 814837ec15..d84a70d3f8 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -125,11 +125,11 @@ public class QueryEnvironment {
* @param sqlNodeAndOptions parsed SQL query.
* @return a dispatchable query plan
*/
- public QueryPlan planQuery(String sqlQuery, SqlNodeAndOptions sqlNodeAndOptions) {
+ public QueryPlan planQuery(String sqlQuery, SqlNodeAndOptions sqlNodeAndOptions, long requestId) {
try (PlannerContext plannerContext = new PlannerContext(_config, _catalogReader, _typeFactory, _hepProgram)) {
plannerContext.setOptions(sqlNodeAndOptions.getOptions());
RelRoot relRoot = compileQuery(sqlNodeAndOptions.getSqlNode(), plannerContext);
- return toDispatchablePlan(relRoot, plannerContext);
+ return toDispatchablePlan(relRoot, plannerContext, requestId);
} catch (CalciteContextException e) {
throw new RuntimeException("Error composing query plan for '" + sqlQuery
+ "': " + e.getMessage() + "'", e);
@@ -141,9 +141,9 @@ public class QueryEnvironment {
/**
* Explain a SQL query.
*
- * Similar to {@link QueryEnvironment#planQuery(String, SqlNodeAndOptions)}, this API runs the query compilation.
- * But it doesn't run the distributed {@link QueryPlan} generation, instead it only returns the explained logical
- * plan.
+ * Similar to {@link QueryEnvironment#planQuery(String, SqlNodeAndOptions, long)}, this API runs the query
+ * compilation. But it doesn't run the distributed {@link QueryPlan} generation, instead it only returns the
+ * explained logical plan.
*
* @param sqlQuery SQL query string.
* @param sqlNodeAndOptions parsed SQL query.
@@ -165,7 +165,7 @@ public class QueryEnvironment {
@VisibleForTesting
public QueryPlan planQuery(String sqlQuery) {
- return planQuery(sqlQuery, CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery));
+ return planQuery(sqlQuery, CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery), 0);
}
@VisibleForTesting
@@ -219,9 +219,9 @@ public class QueryEnvironment {
}
}
- private QueryPlan toDispatchablePlan(RelRoot relRoot, PlannerContext plannerContext) {
+ private QueryPlan toDispatchablePlan(RelRoot relRoot, PlannerContext plannerContext, long requestId) {
// 5. construct a dispatchable query plan.
- StagePlanner queryStagePlanner = new StagePlanner(plannerContext, _workerManager);
+ StagePlanner queryStagePlanner = new StagePlanner(plannerContext, _workerManager, requestId);
return queryStagePlanner.makePlan(relRoot);
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index 2d61856c85..5f46b23d26 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -44,10 +44,12 @@ public class StagePlanner {
private final PlannerContext _plannerContext; // DO NOT REMOVE.
private final WorkerManager _workerManager;
private int _stageIdCounter;
+ private long _requestId;
- public StagePlanner(PlannerContext plannerContext, WorkerManager workerManager) {
+ public StagePlanner(PlannerContext plannerContext, WorkerManager workerManager, long requestId) {
_plannerContext = plannerContext;
_workerManager = workerManager;
+ _requestId = requestId;
}
/**
@@ -79,7 +81,7 @@ public class StagePlanner {
// assign workers to each stage.
for (Map.Entry<Integer, StageMetadata> e : queryPlan.getStageMetadataMap().entrySet()) {
- _workerManager.assignWorkerToStage(e.getKey(), e.getValue());
+ _workerManager.assignWorkerToStage(e.getKey(), e.getValue(), _requestId);
}
return queryPlan;
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 112aec606b..42bb19d269 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -58,12 +58,12 @@ public class WorkerManager {
_routingManager = routingManager;
}
- public void assignWorkerToStage(int stageId, StageMetadata stageMetadata) {
+ public void assignWorkerToStage(int stageId, StageMetadata stageMetadata, long requestId) {
List<String> scannedTables = stageMetadata.getScannedTables();
if (scannedTables.size() == 1) {
// table scan stage, need to attach server as well as segment info for each physical table type.
String logicalTableName = scannedTables.get(0);
- Map<String, RoutingTable> routingTableMap = getRoutingTable(logicalTableName);
+ Map<String, RoutingTable> routingTableMap = getRoutingTable(logicalTableName, requestId);
if (routingTableMap.size() == 0) {
throw new IllegalArgumentException("Unable to find routing entries for table: " + logicalTableName);
}
@@ -124,22 +124,22 @@ public class WorkerManager {
* @param logicalTableName it can either be a hybrid table name or a physical table name with table type.
* @return keyed-map from table type(s) to routing table(s).
*/
- private Map<String, RoutingTable> getRoutingTable(String logicalTableName) {
+ private Map<String, RoutingTable> getRoutingTable(String logicalTableName, long requestId) {
String rawTableName = TableNameBuilder.extractRawTableName(logicalTableName);
TableType tableType = TableNameBuilder.getTableTypeFromTableName(logicalTableName);
Map<String, RoutingTable> routingTableMap = new HashMap<>();
RoutingTable routingTable;
if (tableType == null) {
- routingTable = getRoutingTable(rawTableName, TableType.OFFLINE);
+ routingTable = getRoutingTable(rawTableName, TableType.OFFLINE, requestId);
if (routingTable != null) {
routingTableMap.put(TableType.OFFLINE.name(), routingTable);
}
- routingTable = getRoutingTable(rawTableName, TableType.REALTIME);
+ routingTable = getRoutingTable(rawTableName, TableType.REALTIME, requestId);
if (routingTable != null) {
routingTableMap.put(TableType.REALTIME.name(), routingTable);
}
} else {
- routingTable = getRoutingTable(logicalTableName, tableType);
+ routingTable = getRoutingTable(logicalTableName, tableType, requestId);
if (routingTable != null) {
routingTableMap.put(tableType.name(), routingTable);
}
@@ -147,10 +147,10 @@ public class WorkerManager {
return routingTableMap;
}
- private RoutingTable getRoutingTable(String tableName, TableType tableType) {
+ private RoutingTable getRoutingTable(String tableName, TableType tableType, long requestId) {
String tableNameWithType = TableNameBuilder.forType(tableType).tableNameWithType(
TableNameBuilder.extractRawTableName(tableName));
return _routingManager.getRoutingTable(
- CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + tableNameWithType));
+ CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + tableNameWithType), requestId);
}
}
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
index 9d2d5e1a22..5e75de0455 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
@@ -144,7 +144,7 @@ public class MockRoutingManagerFactory {
}
@Override
- public RoutingTable getRoutingTable(BrokerRequest brokerRequest) {
+ public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId) {
String tableName = brokerRequest.getPinotQuery().getDataSource().getTableName();
return _routingTableMap.getOrDefault(tableName,
_routingTableMap.get(TableNameBuilder.extractRawTableName(tableName)));
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
index 2c238aa287..8af5773675 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
@@ -31,6 +31,7 @@ public class RoutingConfig extends BaseJsonConfig {
public static final String EMPTY_SEGMENT_PRUNER_TYPE = "empty";
public static final String REPLICA_GROUP_INSTANCE_SELECTOR_TYPE = "replicaGroup";
public static final String STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE = "strictReplicaGroup";
+ public static final String MULTI_STAGE_REPLICA_GROUP_SELECTOR_TYPE = "multiStageReplicaGroup";
// Replaced by _segmentPrunerTypes and _instanceSelectorType
@Deprecated
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org