You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/09/21 23:54:44 UTC

[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #9311: Adaptive Server Selection

Jackie-Jiang commented on code in PR #9311:
URL: https://github.com/apache/pinot/pull/9311#discussion_r977067995


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java:
##########
@@ -47,34 +51,108 @@
  * selected such that half the segments will come from S1 and other half from S2. If NUM_REPLICA_GROUPS_TO_QUERY value
  * is much greater than available servers, then ReplicaGroupInstanceSelector will behave similar to
  * BalancedInstanceSelector.
+ * <p>If AdaptiveServerSelection is enabled, a single snapshot of the server ranking is fetched. This ranking is
+ * referenced to pick the best available server for each segment. The algorithm ends up picking the minimum number of
+ * servers required to process a query because it references a single snapshot of the server rankings. Currently,
+ * NUM_REPLICA_GROUPS_TO_QUERY is not supported is AdaptiveServerSelection is enabled.
  */
 public class ReplicaGroupInstanceSelector extends BaseInstanceSelector {
 
-  public ReplicaGroupInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics) {
-    super(tableNameWithType, brokerMetrics);
+  public ReplicaGroupInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics,
+      AdaptiveServerSelector adaptiveServerSelector) {
+    super(tableNameWithType, brokerMetrics, adaptiveServerSelector);
   }
 
   @Override
   Map<String, String> select(List<String> segments, int requestId,
-      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions) {
+      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions,
+      @Nullable AdaptiveServerSelector adaptiveServerSelector) {
     Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
+
+    List<String> serverRankList = new ArrayList<>();

Review Comment:
   This will create an extra list even when adaptive server selection is not enabled. Let's avoid that by wrapping the new logic under the if check.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java:
##########
@@ -47,34 +51,108 @@
  * selected such that half the segments will come from S1 and other half from S2. If NUM_REPLICA_GROUPS_TO_QUERY value
  * is much greater than available servers, then ReplicaGroupInstanceSelector will behave similar to
  * BalancedInstanceSelector.
+ * <p>If AdaptiveServerSelection is enabled, a single snapshot of the server ranking is fetched. This ranking is
+ * referenced to pick the best available server for each segment. The algorithm ends up picking the minimum number of
+ * servers required to process a query because it references a single snapshot of the server rankings. Currently,
+ * NUM_REPLICA_GROUPS_TO_QUERY is not supported is AdaptiveServerSelection is enabled.
  */
 public class ReplicaGroupInstanceSelector extends BaseInstanceSelector {
 
-  public ReplicaGroupInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics) {
-    super(tableNameWithType, brokerMetrics);
+  public ReplicaGroupInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics,
+      AdaptiveServerSelector adaptiveServerSelector) {

Review Comment:
   (minor) Annotate with `@Nullable`



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java:
##########
@@ -21,35 +21,55 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.utils.HashUtil;
 
 
 /**
  * Instance selector to balance the number of segments served by each selected server instance.
- * <p>The selection algorithm will always evenly distribute the traffic to all replicas of each segment, and will try
- * to select different replica id for each segment. The algorithm is very light-weight and will do best effort to
- * balance the number of segments served by each selected server instance.
+ * <p>If AdaptiveServerSelection is enabled, the request is routed to the best available server for a segment
+ * when it is processed below. This is a best effort approach in distributing the query to all available servers.
+ * If some servers are performing poorly, they might not end up being picked for any of the segments. For example,
+ * there's a query for Segments 1 (Seg1), 2 (Seg2) and Seg3). The servers are S1, S2, S3. The algorithm works as
+ * follows:
+ *    Step1: Process seg1. Fetch server rankings. Pick the best server.
+ *    Step2: Process seg2. Fetch server rankings (could have changed or not since Step 1). Pick the best server.
+ *    Step3: Process seg3. Fetch server rankings (could have changed or not since Step 2). Pick the best server.
+
+ * <p>If AdaptiveServerSelection is disabled, the selection algorithm will always evenly distribute the traffic to all
+ * replicas of each segment, and will try to select different replica id for each segment. The algorithm is very
+ * light-weight and will do best effort to balance the number of segments served by each selected server instance.
  */
 public class BalancedInstanceSelector extends BaseInstanceSelector {
 
-  public BalancedInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics) {
-    super(tableNameWithType, brokerMetrics);
+  public BalancedInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics,
+      @Nullable AdaptiveServerSelector adaptiveServerSelector) {
+    super(tableNameWithType, brokerMetrics, adaptiveServerSelector);
   }
 
   @Override
   Map<String, String> select(List<String> segments, int requestId,
-      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions) {
+      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions,
+      @Nullable AdaptiveServerSelector adaptiveServerSelector) {
     Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
     for (String segment : segments) {
       List<String> enabledInstances = segmentToEnabledInstancesMap.get(segment);
       // NOTE: enabledInstances can be null when there is no enabled instances for the segment, or the instance selector
       // has not been updated (we update all components for routing in sequence)
-      if (enabledInstances != null) {
-        int numEnabledInstances = enabledInstances.size();
-        segmentToSelectedInstanceMap.put(segment, enabledInstances.get(requestId++ % numEnabledInstances));
+      if (enabledInstances == null) {
+        continue;
+      }
+
+      String selectedServer = enabledInstances.get(requestId++ % enabledInstances.size());

Review Comment:
   Put this in the `else` clause



##########
pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java:
##########
@@ -42,6 +42,7 @@ public class AsyncQueryResponse implements QueryResponse {
   private final ConcurrentHashMap<ServerRoutingInstance, ServerResponse> _responseMap;
   private final CountDownLatch _countDownLatch;
   private final long _maxEndTimeMs;
+  private final long _queryTimeoutMs;

Review Comment:
   (nit) Rename to `_timeoutMs`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -815,7 +815,7 @@ public synchronized PinotResourceManagerResponse deleteSegments(String tableName
   public synchronized PinotResourceManagerResponse deleteSegments(String tableNameWithType, List<String> segmentNames,
       @Nullable String retentionPeriod) {
     try {
-      LOGGER.info("Trying to delete segments: {} from table: {} ", segmentNames, tableNameWithType);
+        LOGGER.info("Trying to delete segments: {} from table: {} ", segmentNames, tableNameWithType);

Review Comment:
   Typo?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -291,5 +296,6 @@ public SelectionResult select(BrokerRequest brokerRequest, List<String> segments
    * ONLINE/CONSUMING instances). If enabled instances are not {@code null}, they are sorted in alphabetical order.
    */
   abstract Map<String, String> select(List<String> segments, int requestId,
-      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions);
+      Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions,

Review Comment:
   No need to change this. We should make `_adaptiveServerSelector` protected so that it can be accessed by the subclasses



##########
pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java:
##########
@@ -107,6 +114,16 @@ public Exception getException() {
     return _exception;
   }
 
+  @Override
+  public long getRequestId() {
+    return _requestId;
+  }
+
+  @Override
+  public long getTimeOutMs() {

Review Comment:
   (nit) Rename to `getTimeoutMs()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org