You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/09/26 20:29:00 UTC
[pinot] branch master updated: Adaptive Server Selection: address review comments (#9462)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7b4ef53e36 Adaptive Server Selection: address review comments (#9462)
7b4ef53e36 is described below
commit 7b4ef53e3619651a434afefdc7477985cbb52952
Author: Vivek Iyer Vaidyanathan <vv...@gmail.com>
AuthorDate: Mon Sep 26 13:28:54 2022 -0700
Adaptive Server Selection: address review comments (#9462)
---
.../instanceselector/BalancedInstanceSelector.java | 11 ++++++-----
.../routing/instanceselector/BaseInstanceSelector.java | 7 +++----
.../instanceselector/ReplicaGroupInstanceSelector.java | 17 +++++++++--------
.../helix/core/PinotHelixResourceManager.java | 2 +-
.../apache/pinot/core/transport/AsyncQueryResponse.java | 8 ++++----
.../org/apache/pinot/core/transport/QueryResponse.java | 2 +-
.../org/apache/pinot/core/transport/QueryRouter.java | 4 ++--
7 files changed, 26 insertions(+), 25 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
index f4e0a161f2..de5eb9f530 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
@@ -51,8 +51,7 @@ public class BalancedInstanceSelector extends BaseInstanceSelector {
@Override
Map<String, String> select(List<String> segments, int requestId,
- Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions,
- @Nullable AdaptiveServerSelector adaptiveServerSelector) {
+ Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions) {
Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
for (String segment : segments) {
List<String> enabledInstances = segmentToEnabledInstancesMap.get(segment);
@@ -62,9 +61,11 @@ public class BalancedInstanceSelector extends BaseInstanceSelector {
continue;
}
- String selectedServer = enabledInstances.get(requestId++ % enabledInstances.size());
- if (adaptiveServerSelector != null) {
- selectedServer = adaptiveServerSelector.select(enabledInstances);
+ String selectedServer;
+ if (_adaptiveServerSelector != null) {
+ selectedServer = _adaptiveServerSelector.select(enabledInstances);
+ } else {
+ selectedServer = enabledInstances.get(requestId++ % enabledInstances.size());
}
segmentToSelectedInstanceMap.put(segment, selectedServer);
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index 6fe91347ae..9b92d6031c 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -55,7 +55,7 @@ abstract class BaseInstanceSelector implements InstanceSelector {
private final AtomicLong _requestId = new AtomicLong();
private final String _tableNameWithType;
private final BrokerMetrics _brokerMetrics;
- private final AdaptiveServerSelector _adaptiveServerSelector;
+ protected final AdaptiveServerSelector _adaptiveServerSelector;
// These 4 variables are the cached states to help accelerate the change processing
private Set<String> _enabledInstances;
@@ -274,7 +274,7 @@ abstract class BaseInstanceSelector implements InstanceSelector {
? brokerRequest.getPinotQuery().getQueryOptions()
: Collections.emptyMap();
Map<String, String> segmentToInstanceMap = select(segments, requestId, _segmentToEnabledInstancesMap,
- queryOptions, _adaptiveServerSelector);
+ queryOptions);
Set<String> unavailableSegments = _unavailableSegments;
if (unavailableSegments.isEmpty()) {
return new SelectionResult(segmentToInstanceMap, Collections.emptyList());
@@ -296,6 +296,5 @@ abstract class BaseInstanceSelector implements InstanceSelector {
* ONLINE/CONSUMING instances). If enabled instances are not {@code null}, they are sorted in alphabetical order.
*/
abstract Map<String, String> select(List<String> segments, int requestId,
- Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions,
- AdaptiveServerSelector adaptiveServerSelector);
+ Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions);
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
index ad00560dcf..a9ca487ff5 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
@@ -59,30 +59,31 @@ import org.apache.pinot.core.util.QueryOptionsUtils;
public class ReplicaGroupInstanceSelector extends BaseInstanceSelector {
public ReplicaGroupInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics,
- AdaptiveServerSelector adaptiveServerSelector) {
+ @Nullable AdaptiveServerSelector adaptiveServerSelector) {
super(tableNameWithType, brokerMetrics, adaptiveServerSelector);
}
@Override
Map<String, String> select(List<String> segments, int requestId,
- Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions,
- @Nullable AdaptiveServerSelector adaptiveServerSelector) {
+ Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions) {
Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
- List<String> serverRankList = new ArrayList<>();
- if (adaptiveServerSelector != null) {
+
+ if (_adaptiveServerSelector != null) {
+ // Adaptive Server Selection is enabled.
+ List<String> serverRankList = new ArrayList<>();
+
// Fetch serverRankList before looping through all the segments. This is important to make sure that we pick
// the least amount of instances for a query by referring to a single snapshot of the rankings.
- List<Pair<String, Double>> serverRankListWithScores = adaptiveServerSelector.fetchAllServerRankingsWithScores();
+ List<Pair<String, Double>> serverRankListWithScores = _adaptiveServerSelector.fetchAllServerRankingsWithScores();
for (Pair<String, Double> entry : serverRankListWithScores) {
serverRankList.add(entry.getLeft());
}
- }
- if (serverRankList.size() > 0) {
selectServersUsingAdaptiverServerSelector(segments, requestId, segmentToSelectedInstanceMap,
segmentToEnabledInstancesMap, queryOptions, serverRankList);
} else {
+ // Adaptive Server Selection is NOT enabled.
selectServersUsingRoundRobin(segments, requestId, segmentToSelectedInstanceMap, segmentToEnabledInstancesMap,
queryOptions);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ba4a87e7c0..af25e423b2 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -820,7 +820,7 @@ public class PinotHelixResourceManager {
public synchronized PinotResourceManagerResponse deleteSegments(String tableNameWithType, List<String> segmentNames,
@Nullable String retentionPeriod) {
try {
- LOGGER.info("Trying to delete segments: {} from table: {} ", segmentNames, tableNameWithType);
+ LOGGER.info("Trying to delete segments: {} from table: {} ", segmentNames, tableNameWithType);
Preconditions.checkArgument(TableNameBuilder.isTableResource(tableNameWithType),
"Table name: %s is not a valid table name with type suffix", tableNameWithType);
HelixHelper.removeSegmentsFromIdealState(_helixZkManager, tableNameWithType, segmentNames);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
index 5eaf7c900a..c2162a591e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
@@ -42,7 +42,7 @@ public class AsyncQueryResponse implements QueryResponse {
private final ConcurrentHashMap<ServerRoutingInstance, ServerResponse> _responseMap;
private final CountDownLatch _countDownLatch;
private final long _maxEndTimeMs;
- private final long _queryTimeoutMs;
+ private final long _timeoutMs;
private volatile ServerRoutingInstance _failedServer;
private volatile Exception _exception;
@@ -57,7 +57,7 @@ public class AsyncQueryResponse implements QueryResponse {
_responseMap.put(serverRoutingInstance, new ServerResponse(startTimeMs));
}
_countDownLatch = new CountDownLatch(numServersQueried);
- _queryTimeoutMs = timeoutMs;
+ _timeoutMs = timeoutMs;
_maxEndTimeMs = startTimeMs + timeoutMs;
}
@@ -120,8 +120,8 @@ public class AsyncQueryResponse implements QueryResponse {
}
@Override
- public long getTimeOutMs() {
- return _queryTimeoutMs;
+ public long getTimeoutMs() {
+ return _timeoutMs;
}
void markRequestSubmitted(ServerRoutingInstance serverRoutingInstance) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryResponse.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryResponse.java
index 7c921d796d..23f5946cd3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryResponse.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryResponse.java
@@ -83,7 +83,7 @@ public interface QueryResponse {
/**
* Returns the query timeout in milliseconds.
*/
- long getTimeOutMs();
+ long getTimeoutMs();
/**
* Returns the exception if the query fails.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
index cdc4305bed..06232b1e15 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
@@ -154,7 +154,7 @@ public class QueryRouter {
LOGGER.error("Caught exception while sending request {} to server: {}, marking query failed", requestId,
serverRoutingInstance, e);
_serverRoutingStatsManager.recordStatsUponResponseArrival(requestId, serverRoutingInstance.getInstanceId(),
- (int) asyncQueryResponse.getTimeOutMs());
+ (int) asyncQueryResponse.getTimeoutMs());
asyncQueryResponse.markQueryFailed(serverRoutingInstance, e);
}
@@ -201,7 +201,7 @@ public class QueryRouter {
for (AsyncQueryResponse asyncQueryResponse : _asyncQueryResponseMap.values()) {
asyncQueryResponse.markServerDown(serverRoutingInstance, exception);
_serverRoutingStatsManager.recordStatsUponResponseArrival(asyncQueryResponse.getRequestId(),
- serverRoutingInstance.getInstanceId(), (int) asyncQueryResponse.getTimeOutMs());
+ serverRoutingInstance.getInstanceId(), (int) asyncQueryResponse.getTimeoutMs());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org