You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/01 07:17:17 UTC

(pinot) 04/05: Add logic to consider the case when instances are moved across pools

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch maintain-pool-selection-for-minimizeDataMovement
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 25b4c3a212b234106a92e30489b03f195d3eb60c
Author: jlli_LinkedIn <jl...@linkedin.com>
AuthorDate: Fri Dec 29 23:26:52 2023 -0800

    Add logic to consider the case when instances are moved across pools
---
 .../InstanceReplicaGroupPartitionSelector.java     | 75 +++++++++++++++++----
 .../instance/InstanceTagPoolSelector.java          | 38 ++++++-----
 .../InstanceReplicaGroupPartitionSelectorTest.java | 76 ++++++++++++++++++++--
 .../java/org/apache/pinot/spi/utils/Pairs.java     | 23 ++++++-
 4 files changed, 171 insertions(+), 41 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
index 79e95db7a6..505006f1d3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
@@ -74,6 +74,9 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
       int numReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups();
       Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups must be positive");
       Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
+      Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new TreeMap<>();
+      Map<Integer, Set<Integer>> existingPoolToExistingReplicaGroupIdsMap = new TreeMap<>();
+      Map<Integer, Set<String>> existingReplicaGroupIdToExistingInstancesMap = new TreeMap<>();
       Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
       Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
       Map<String, Integer> instanceToPoolMap = new HashMap<>();
@@ -89,26 +92,70 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
       }
 
       if (_minimizeDataMovement && _existingInstancePartitions != null) {
-        // Keep the same pool for the replica group if it's already been used for the table.
+        // Collect the stats between the existing pools, existing replica groups, and existing instances.
         int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
         int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
-        int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups);
-        for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; replicaGroupId++) {
-          boolean foundExistingReplicaGroup = false;
-          for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingReplicaGroup; partitionId++) {
+        for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
+          for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
             List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
             for (String existingInstance : existingInstances) {
               Integer existingPool = instanceToPoolMap.get(existingInstance);
-              if (existingPool != null & pools.contains(existingPool)) {
-                poolToReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new ArrayList<>()).add(replicaGroupId);
-                replicaGroupIdToPoolMap.put(replicaGroupId, existingPool);
-                foundExistingReplicaGroup = true;
-                break;
+              if (existingPool != null) {
+                existingPoolsToExistingInstancesMap.computeIfAbsent(existingPool, k -> new HashSet<>())
+                    .add(existingInstance);
+                existingPoolToExistingReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new HashSet<>())
+                    .add(replicaGroupId);
+                existingReplicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>())
+                    .add(existingInstance);
+              }
+            }
+          }
+        }
+
+        // Use a max heap to track the number of servers used for the given pools,
+        // so that pool with max number of existing instances will be considered first.
+        PriorityQueue<Pairs.IntPair> maxHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator(false));
+        for (int pool : pools) {
+          maxHeap.add(
+              new Pairs.IntPair(existingPoolsToExistingInstancesMap.computeIfAbsent(pool, k -> new HashSet<>()).size(),
+                  pool));
+        }
+
+        // Get the maximum number of replica groups per pool.
+        int maxNumberOfReplicaGroupPerPool = numReplicaGroups / pools.size();
+        // Given a pool number, assign replica group which has the max number of existing instances.
+        // Repeat this process until the max number of replica groups per pool is reached.
+        while (!maxHeap.isEmpty()) {
+          Pairs.IntPair pair = maxHeap.remove();
+          int poolNumber = pair.getRight();
+          for (int i = 0; i < maxNumberOfReplicaGroupPerPool; i++) {
+            Set<Integer> existingReplicaGroups = existingPoolToExistingReplicaGroupIdsMap.get(poolNumber);
+            if (existingReplicaGroups == null || existingReplicaGroups.isEmpty()) {
+              continue;
+            }
+            int targetReplicaGroupId = -1;
+            int maxNumInstances = 0;
+            for (int existingReplicaGroupId : existingReplicaGroups) {
+              int numExistingInstances =
+                  existingReplicaGroupIdToExistingInstancesMap.getOrDefault(existingReplicaGroupId, new HashSet<>())
+                      .size();
+              if (numExistingInstances > maxNumInstances) {
+                maxNumInstances = numExistingInstances;
+                targetReplicaGroupId = existingReplicaGroupId;
               }
             }
+            // If target existing replica group cannot be found, it means it should be chosen from a new replica group.
+            if (targetReplicaGroupId > -1) {
+              poolToReplicaGroupIdsMap.computeIfAbsent(poolNumber, k -> new ArrayList<>()).add(targetReplicaGroupId);
+              replicaGroupIdToPoolMap.put(targetReplicaGroupId, poolNumber);
+              // Clear the stats so that the same replica group won't be picked up again in later iteration.
+              existingReplicaGroupIdToExistingInstancesMap.get(targetReplicaGroupId).clear();
+            }
           }
         }
-        // Use a min heap to track the least frequently picked pool among all the pools
+
+        // If there is any new replica group added, choose pool which is least frequently picked up.
+        // Use a min heap to track the least frequently picked pool among all the pools.
         PriorityQueue<Pairs.IntPair> minHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator());
         for (int pool : pools) {
           int numExistingReplicaGroups =
@@ -190,7 +237,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
         int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
         int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups);
 
-        Map<Integer, Set<String>> replicaGroupIdToExistingInstancesMap = new TreeMap<>();
+        existingReplicaGroupIdToExistingInstancesMap = new TreeMap<>();
         // Step 1: find out the replica groups and their existing instances,
         //   so that these instances can be filtered out and won't be chosen for the other replica group.
         for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; replicaGroupId++) {
@@ -202,7 +249,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
 
           for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
             List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
-            replicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>())
+            existingReplicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>())
                 .addAll(existingInstances);
           }
         }
@@ -215,7 +262,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
               otherReplicaGroupId < existingNumReplicaGroups && otherReplicaGroupId < numReplicaGroups;
               otherReplicaGroupId++) {
             if (replicaGroupId != otherReplicaGroupId) {
-              candidateInstances.removeAll(replicaGroupIdToExistingInstancesMap.get(otherReplicaGroupId));
+              candidateInstances.removeAll(existingReplicaGroupIdToExistingInstancesMap.get(otherReplicaGroupId));
             }
           }
           LinkedHashSet<String> chosenCandidateInstances = new LinkedHashSet<>();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
index 2062a75209..940968432b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
@@ -22,16 +22,18 @@ import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.TreeSet;
 import javax.annotation.Nullable;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.utils.config.InstanceUtils;
 import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
+import org.apache.pinot.spi.utils.Pairs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,36 +125,38 @@ public class InstanceTagPoolSelector {
 
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
         if (_minimizeDataMovement && _existingInstancePartitions != null) {
-          Set<Integer> existingPools = new TreeSet<>();
+          Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new TreeMap<>();
           // Keep the same pool if it's already been used for the table.
           int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
           int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
           for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
-            boolean foundExistingPoolForReplicaGroup = false;
-            for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingPoolForReplicaGroup;
-                partitionId++) {
+            for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
               List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
               for (String existingInstance : existingInstances) {
                 Integer existingPool = instanceToPoolMap.get(existingInstance);
                 if (existingPool != null) {
-                  if (existingPools.add(existingPool)) {
-                    poolsToSelect.add(existingPool);
+                  if (!existingPoolsToExistingInstancesMap.containsKey(existingPool)) {
+                    existingPoolsToExistingInstancesMap.put(existingPool, new HashSet<>());
                   }
-                  foundExistingPoolForReplicaGroup = true;
-                  break;
+                  existingPoolsToExistingInstancesMap.computeIfAbsent(existingPool, k -> new HashSet<>())
+                      .add(existingInstance);
                 }
               }
             }
           }
-          LOGGER.info("Keep the same pool: {} for table: {}", existingPools, _tableNameWithType);
-          // Pick a pool from remainingPools that isn't used before.
-          List<Integer> remainingPools = new ArrayList<>(pools);
-          remainingPools.removeAll(existingPools);
-          // Select from the remaining pools.
-          int remainingNumPoolsToSelect = numPoolsToSelect - poolsToSelect.size();
-          for (int i = 0; i < remainingNumPoolsToSelect; i++) {
-            poolsToSelect.add(remainingPools.remove(i % remainingPools.size()));
+
+          // Use a max heap to track the number of servers used for all the pools.
+          PriorityQueue<Pairs.IntPair> maxHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator(false));
+          for (int pool : pools) {
+            maxHeap.add(new Pairs.IntPair(existingPoolsToExistingInstancesMap.get(pool).size(), pool));
+          }
+
+          // Pick the pools from the max heap, so that data movement be minimized.
+          for (int i = 0; i < numPoolsToSelect; i++) {
+            Pairs.IntPair pair = maxHeap.remove();
+            poolsToSelect.add(pair.getRight());
           }
+          LOGGER.info("The selected pools: " + poolsToSelect);
         } else {
           // Select pools based on the table name hash to evenly distribute the tables
           List<Integer> poolsInCluster = new ArrayList<>(pools);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
index 2fdef27796..fdb6292f26 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
@@ -34,6 +34,8 @@ import org.testng.annotations.Test;
 
 public class InstanceReplicaGroupPartitionSelectorTest {
 
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
   private static final String INSTANCE_CONFIG_TEMPLATE =
       "{\n" + "  \"id\": \"Server_pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
           + "  \"simpleFields\": {\n" + "    \"HELIX_ENABLED\": \"true\",\n"
@@ -51,15 +53,15 @@ public class InstanceReplicaGroupPartitionSelectorTest {
           + "    ]\n" + "  }\n" + "}";
 
   @Test
-  public void testSelectInstances() throws JsonProcessingException {
-    ObjectMapper objectMapper = new ObjectMapper();
+  public void testPoolsWhenOneMorePoolAddedAndOneMoreReplicaGroupsNeeded()
+      throws JsonProcessingException {
     String existingPartitionsJson =
         "    {\n" + "      \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
             + "      \"partitionToInstancesMap\": {\n" + "        \"0_0\": [\n"
             + "          \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
             + "          \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
             + "        ]\n" + "      }\n" + "    }\n";
-    InstancePartitions existing = objectMapper.readValue(existingPartitionsJson, InstancePartitions.class);
+    InstancePartitions existing = OBJECT_MAPPER.readValue(existingPartitionsJson, InstancePartitions.class);
     InstanceReplicaGroupPartitionConfig config =
         new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, null);
 
@@ -68,8 +70,10 @@ public class InstanceReplicaGroupPartitionSelectorTest {
 
     String[] serverNames = {"rg0-0", "rg0-1", "rg1-0", "rg1-1"};
     String[] poolNumbers = {"0", "0", "1", "1"};
-    String[] poolNames = {"FirstHalfReplicationGroups", "FirstHalfReplicationGroups", "SecondHalfReplicationGroups",
-        "SecondHalfReplicationGroups"};
+    String[] poolNames = {
+        "FirstHalfReplicationGroups", "FirstHalfReplicationGroups", "SecondHalfReplicationGroups",
+        "SecondHalfReplicationGroups"
+    };
     Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = new HashMap<>();
 
     for (int i = 0; i < serverNames.length; i++) {
@@ -81,13 +85,15 @@ public class InstanceReplicaGroupPartitionSelectorTest {
       StringSubstitutor substitutor = new StringSubstitutor(valuesMap);
       String resolvedString = substitutor.replace(INSTANCE_CONFIG_TEMPLATE);
 
-      ZNRecord znRecord = objectMapper.readValue(resolvedString, ZNRecord.class);
+      ZNRecord znRecord = OBJECT_MAPPER.readValue(resolvedString, ZNRecord.class);
       int poolNumber = Integer.parseInt(poolNumbers[i]);
       poolToInstanceConfigsMap.computeIfAbsent(poolNumber, k -> new ArrayList<>()).add(new InstanceConfig(znRecord));
     }
     InstancePartitions assignedPartitions = new InstancePartitions("0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE");
     selector.selectInstances(poolToInstanceConfigsMap, assignedPartitions);
 
+    // Now that 1 more pool is added and 1 more RG is needed, a new set called "0_1" is generated,
+    // and the instances from Pool 1 are assigned to this new replica.
     String expectedInstancePartitions =
         "    {\n" + "      \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
             + "      \"partitionToInstancesMap\": {\n" + "        \"0_0\": [\n"
@@ -98,7 +104,63 @@ public class InstanceReplicaGroupPartitionSelectorTest {
             + "          \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
             + "        ]\n" + "      }\n" + "  }\n";
     InstancePartitions expectedPartitions =
-        objectMapper.readValue(expectedInstancePartitions, InstancePartitions.class);
+        OBJECT_MAPPER.readValue(expectedInstancePartitions, InstancePartitions.class);
+    assert assignedPartitions.equals(expectedPartitions);
+  }
+
+  @Test
+  public void testSelectPoolsWhenExistingReplicaGroupMapsToMultiplePools()
+      throws JsonProcessingException {
+    // The "rg0-2" instance used to belong to Pool 1, but now it belongs to Pool 0.
+    String existingPartitionsJson =
+        "    {\n" + "      \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
+            + "      \"partitionToInstancesMap\": {\n" + "        \"0_0\": [\n"
+            + "          \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+            + "          \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+            + "        ],\n" + "        \"0_1\": [\n"
+            + "          \"Server_pinot-server-rg0-2.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+            + "          \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+            + "        ]\n" + "      }\n" + "  }\n";
+    InstancePartitions existing = OBJECT_MAPPER.readValue(existingPartitionsJson, InstancePartitions.class);
+    InstanceReplicaGroupPartitionConfig config =
+        new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, null);
+
+    InstanceReplicaGroupPartitionSelector selector =
+        new InstanceReplicaGroupPartitionSelector(config, "tableNameBlah", existing, true);
+
+    String[] serverNames = {"rg0-0", "rg0-1", "rg0-2", "rg1-0", "rg1-1", "rg1-2"};
+    String[] poolNumbers = {"0", "0", "0", "1", "1", "1"};
+    Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = new HashMap<>();
+
+    for (int i = 0; i < serverNames.length; i++) {
+      Map<String, String> valuesMap = new HashMap<>();
+      valuesMap.put("serverName", serverNames[i]);
+      valuesMap.put("pool", poolNumbers[i]);
+
+      StringSubstitutor substitutor = new StringSubstitutor(valuesMap);
+      String resolvedString = substitutor.replace(INSTANCE_CONFIG_TEMPLATE);
+
+      ZNRecord znRecord = OBJECT_MAPPER.readValue(resolvedString, ZNRecord.class);
+      int poolNumber = Integer.parseInt(poolNumbers[i]);
+      poolToInstanceConfigsMap.computeIfAbsent(poolNumber, k -> new ArrayList<>()).add(new InstanceConfig(znRecord));
+    }
+
+    InstancePartitions assignedPartitions = new InstancePartitions("0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE");
+    selector.selectInstances(poolToInstanceConfigsMap, assignedPartitions);
+
+    // The "rg0-2" instance is replaced by "rg1-0" (which belongs to Pool 1), as "rg0-2" no longer belongs to Pool 1.
+    // And "rg1-0" remains the same position as it's always under Pool 1.
+    String expectedInstancePartitions =
+        "    {\n" + "      \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
+            + "      \"partitionToInstancesMap\": {\n" + "        \"0_0\": [\n"
+            + "          \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+            + "          \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+            + "        ],\n" + "        \"0_1\": [\n"
+            + "          \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+            + "          \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+            + "        ]\n" + "      }\n" + "  }\n";
+    InstancePartitions expectedPartitions =
+        OBJECT_MAPPER.readValue(expectedInstancePartitions, InstancePartitions.class);
     assert assignedPartitions.equals(expectedPartitions);
   }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java
index be18d35e50..45645387af 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java
@@ -30,7 +30,11 @@ public class Pairs {
   }
 
   public static Comparator<IntPair> intPairComparator() {
-    return new AscendingIntPairComparator();
+    return new AscendingIntPairComparator(true);
+  }
+
+  public static Comparator<IntPair> intPairComparator(boolean ascending) {
+    return new AscendingIntPairComparator(ascending);
   }
 
   public static class IntPair {
@@ -79,13 +83,26 @@ public class Pairs {
   }
 
   public static class AscendingIntPairComparator implements Comparator<IntPair> {
+    private boolean _ascending;
+
+    public AscendingIntPairComparator(boolean ascending) {
+      _ascending = ascending;
+    }
 
     @Override
     public int compare(IntPair pair1, IntPair pair2) {
       if (pair1._left != pair2._left) {
-        return Integer.compare(pair1._left, pair2._left);
+        if (_ascending) {
+          return Integer.compare(pair1._left, pair2._left);
+        } else {
+          return Integer.compare(pair2._left, pair1._left);
+        }
       } else {
-        return Integer.compare(pair1._right, pair2._right);
+        if (_ascending) {
+          return Integer.compare(pair1._right, pair2._right);
+        } else {
+          return Integer.compare(pair2._right, pair1._right);
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org