You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/09/26 20:29:00 UTC

[pinot] branch master updated: Adaptive Server Selection: address review comments (#9462)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b4ef53e36 Adaptive Server Selection: address review comments (#9462)
7b4ef53e36 is described below

commit 7b4ef53e3619651a434afefdc7477985cbb52952
Author: Vivek Iyer Vaidyanathan <vv...@gmail.com>
AuthorDate: Mon Sep 26 13:28:54 2022 -0700

    Adaptive Server Selection: address review comments (#9462)
---
 .../instanceselector/BalancedInstanceSelector.java      | 11 ++++++-----
 .../routing/instanceselector/BaseInstanceSelector.java  |  7 +++----
 .../instanceselector/ReplicaGroupInstanceSelector.java  | 17 +++++++++--------
 .../helix/core/PinotHelixResourceManager.java           |  2 +-
 .../apache/pinot/core/transport/AsyncQueryResponse.java |  8 ++++----
 .../org/apache/pinot/core/transport/QueryResponse.java  |  2 +-
 .../org/apache/pinot/core/transport/QueryRouter.java    |  4 ++--
 7 files changed, 26 insertions(+), 25 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
index f4e0a161f2..de5eb9f530 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
@@ -51,8 +51,7 @@ public class BalancedInstanceSelector extends BaseInstanceSelector {
 
   @Override
   Map<String, String> select(List<String> segments, int requestId,
-      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions,
-      @Nullable AdaptiveServerSelector adaptiveServerSelector) {
+      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions) {
     Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
     for (String segment : segments) {
       List<String> enabledInstances = segmentToEnabledInstancesMap.get(segment);
@@ -62,9 +61,11 @@ public class BalancedInstanceSelector extends BaseInstanceSelector {
         continue;
       }
 
-      String selectedServer = enabledInstances.get(requestId++ % enabledInstances.size());
-      if (adaptiveServerSelector != null) {
-        selectedServer = adaptiveServerSelector.select(enabledInstances);
+      String selectedServer;
+      if (_adaptiveServerSelector != null) {
+        selectedServer = _adaptiveServerSelector.select(enabledInstances);
+      } else {
+        selectedServer = enabledInstances.get(requestId++ % enabledInstances.size());
       }
 
       segmentToSelectedInstanceMap.put(segment, selectedServer);
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index 6fe91347ae..9b92d6031c 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -55,7 +55,7 @@ abstract class BaseInstanceSelector implements InstanceSelector {
   private final AtomicLong _requestId = new AtomicLong();
   private final String _tableNameWithType;
   private final BrokerMetrics _brokerMetrics;
-  private final AdaptiveServerSelector _adaptiveServerSelector;
+  protected final AdaptiveServerSelector _adaptiveServerSelector;
 
   // These 4 variables are the cached states to help accelerate the change processing
   private Set<String> _enabledInstances;
@@ -274,7 +274,7 @@ abstract class BaseInstanceSelector implements InstanceSelector {
         ? brokerRequest.getPinotQuery().getQueryOptions()
         : Collections.emptyMap();
     Map<String, String> segmentToInstanceMap = select(segments, requestId, _segmentToEnabledInstancesMap,
-        queryOptions, _adaptiveServerSelector);
+        queryOptions);
     Set<String> unavailableSegments = _unavailableSegments;
     if (unavailableSegments.isEmpty()) {
       return new SelectionResult(segmentToInstanceMap, Collections.emptyList());
@@ -296,6 +296,5 @@ abstract class BaseInstanceSelector implements InstanceSelector {
    * ONLINE/CONSUMING instances). If enabled instances are not {@code null}, they are sorted in alphabetical order.
    */
   abstract Map<String, String> select(List<String> segments, int requestId,
-      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions,
-      AdaptiveServerSelector adaptiveServerSelector);
+      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions);
 }
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
index ad00560dcf..a9ca487ff5 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
@@ -59,30 +59,31 @@ import org.apache.pinot.core.util.QueryOptionsUtils;
 public class ReplicaGroupInstanceSelector extends BaseInstanceSelector {
 
   public ReplicaGroupInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics,
-      AdaptiveServerSelector adaptiveServerSelector) {
+      @Nullable AdaptiveServerSelector adaptiveServerSelector) {
     super(tableNameWithType, brokerMetrics, adaptiveServerSelector);
   }
 
   @Override
   Map<String, String> select(List<String> segments, int requestId,
-      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions,
-      @Nullable AdaptiveServerSelector adaptiveServerSelector) {
+      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions) {
     Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
 
-    List<String> serverRankList = new ArrayList<>();
-    if (adaptiveServerSelector != null) {
+
+    if (_adaptiveServerSelector != null) {
+      // Adaptive Server Selection is enabled.
+      List<String> serverRankList = new ArrayList<>();
+
       // Fetch serverRankList before looping through all the segments. This is important to make sure that we pick
       // the least amount of instances for a query by referring to a single snapshot of the rankings.
-      List<Pair<String, Double>> serverRankListWithScores = adaptiveServerSelector.fetchAllServerRankingsWithScores();
+      List<Pair<String, Double>> serverRankListWithScores = _adaptiveServerSelector.fetchAllServerRankingsWithScores();
       for (Pair<String, Double> entry : serverRankListWithScores) {
         serverRankList.add(entry.getLeft());
       }
-    }
 
-    if (serverRankList.size() > 0) {
       selectServersUsingAdaptiverServerSelector(segments, requestId, segmentToSelectedInstanceMap,
           segmentToEnabledInstancesMap, queryOptions, serverRankList);
     } else {
+      // Adaptive Server Selection is NOT enabled.
       selectServersUsingRoundRobin(segments, requestId, segmentToSelectedInstanceMap, segmentToEnabledInstancesMap,
           queryOptions);
     }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ba4a87e7c0..af25e423b2 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -820,7 +820,7 @@ public class PinotHelixResourceManager {
   public synchronized PinotResourceManagerResponse deleteSegments(String tableNameWithType, List<String> segmentNames,
       @Nullable String retentionPeriod) {
     try {
-        LOGGER.info("Trying to delete segments: {} from table: {} ", segmentNames, tableNameWithType);
+      LOGGER.info("Trying to delete segments: {} from table: {} ", segmentNames, tableNameWithType);
       Preconditions.checkArgument(TableNameBuilder.isTableResource(tableNameWithType),
           "Table name: %s is not a valid table name with type suffix", tableNameWithType);
       HelixHelper.removeSegmentsFromIdealState(_helixZkManager, tableNameWithType, segmentNames);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
index 5eaf7c900a..c2162a591e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
@@ -42,7 +42,7 @@ public class AsyncQueryResponse implements QueryResponse {
   private final ConcurrentHashMap<ServerRoutingInstance, ServerResponse> _responseMap;
   private final CountDownLatch _countDownLatch;
   private final long _maxEndTimeMs;
-  private final long _queryTimeoutMs;
+  private final long _timeoutMs;
 
   private volatile ServerRoutingInstance _failedServer;
   private volatile Exception _exception;
@@ -57,7 +57,7 @@ public class AsyncQueryResponse implements QueryResponse {
       _responseMap.put(serverRoutingInstance, new ServerResponse(startTimeMs));
     }
     _countDownLatch = new CountDownLatch(numServersQueried);
-    _queryTimeoutMs = timeoutMs;
+    _timeoutMs = timeoutMs;
     _maxEndTimeMs = startTimeMs + timeoutMs;
   }
 
@@ -120,8 +120,8 @@ public class AsyncQueryResponse implements QueryResponse {
   }
 
   @Override
-  public long getTimeOutMs() {
-    return _queryTimeoutMs;
+  public long getTimeoutMs() {
+    return _timeoutMs;
   }
 
   void markRequestSubmitted(ServerRoutingInstance serverRoutingInstance) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryResponse.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryResponse.java
index 7c921d796d..23f5946cd3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryResponse.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryResponse.java
@@ -83,7 +83,7 @@ public interface QueryResponse {
   /**
    * Returns the query timeout in milliseconds.
    */
-  long getTimeOutMs();
+  long getTimeoutMs();
 
   /**
    * Returns the exception if the query fails.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
index cdc4305bed..06232b1e15 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
@@ -154,7 +154,7 @@ public class QueryRouter {
     LOGGER.error("Caught exception while sending request {} to server: {}, marking query failed", requestId,
         serverRoutingInstance, e);
     _serverRoutingStatsManager.recordStatsUponResponseArrival(requestId, serverRoutingInstance.getInstanceId(),
-        (int) asyncQueryResponse.getTimeOutMs());
+        (int) asyncQueryResponse.getTimeoutMs());
     asyncQueryResponse.markQueryFailed(serverRoutingInstance, e);
   }
 
@@ -201,7 +201,7 @@ public class QueryRouter {
     for (AsyncQueryResponse asyncQueryResponse : _asyncQueryResponseMap.values()) {
       asyncQueryResponse.markServerDown(serverRoutingInstance, exception);
       _serverRoutingStatsManager.recordStatsUponResponseArrival(asyncQueryResponse.getRequestId(),
-          serverRoutingInstance.getInstanceId(), (int) asyncQueryResponse.getTimeOutMs());
+          serverRoutingInstance.getInstanceId(), (int) asyncQueryResponse.getTimeoutMs());
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org