You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/01 07:17:17 UTC
(pinot) 04/05: Add logic to consider the case when instances are moved across pools
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch maintain-pool-selection-for-minimizeDataMovement
in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 25b4c3a212b234106a92e30489b03f195d3eb60c
Author: jlli_LinkedIn <jl...@linkedin.com>
AuthorDate: Fri Dec 29 23:26:52 2023 -0800
Add logic to consider the case when instances are moved across pools
---
.../InstanceReplicaGroupPartitionSelector.java | 75 +++++++++++++++++----
.../instance/InstanceTagPoolSelector.java | 38 ++++++-----
.../InstanceReplicaGroupPartitionSelectorTest.java | 76 ++++++++++++++++++++--
.../java/org/apache/pinot/spi/utils/Pairs.java | 23 ++++++-
4 files changed, 171 insertions(+), 41 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
index 79e95db7a6..505006f1d3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
@@ -74,6 +74,9 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
int numReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups();
Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups must be positive");
Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
+ Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new TreeMap<>();
+ Map<Integer, Set<Integer>> existingPoolToExistingReplicaGroupIdsMap = new TreeMap<>();
+ Map<Integer, Set<String>> existingReplicaGroupIdToExistingInstancesMap = new TreeMap<>();
Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
Map<String, Integer> instanceToPoolMap = new HashMap<>();
@@ -89,26 +92,70 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
}
if (_minimizeDataMovement && _existingInstancePartitions != null) {
- // Keep the same pool for the replica group if it's already been used for the table.
+ // Collect the stats between the existing pools, existing replica groups, and existing instances.
int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
- int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups);
- for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; replicaGroupId++) {
- boolean foundExistingReplicaGroup = false;
- for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingReplicaGroup; partitionId++) {
+ for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
+ for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
for (String existingInstance : existingInstances) {
Integer existingPool = instanceToPoolMap.get(existingInstance);
- if (existingPool != null & pools.contains(existingPool)) {
- poolToReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new ArrayList<>()).add(replicaGroupId);
- replicaGroupIdToPoolMap.put(replicaGroupId, existingPool);
- foundExistingReplicaGroup = true;
- break;
+ if (existingPool != null) {
+ existingPoolsToExistingInstancesMap.computeIfAbsent(existingPool, k -> new HashSet<>())
+ .add(existingInstance);
+ existingPoolToExistingReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new HashSet<>())
+ .add(replicaGroupId);
+ existingReplicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>())
+ .add(existingInstance);
+ }
+ }
+ }
+ }
+
+ // Use a max heap to track the number of servers used for the given pools,
+ // so that pool with max number of existing instances will be considered first.
+ PriorityQueue<Pairs.IntPair> maxHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator(false));
+ for (int pool : pools) {
+ maxHeap.add(
+ new Pairs.IntPair(existingPoolsToExistingInstancesMap.computeIfAbsent(pool, k -> new HashSet<>()).size(),
+ pool));
+ }
+
+ // Get the maximum number of replica groups per pool.
+ int maxNumberOfReplicaGroupPerPool = numReplicaGroups / pools.size();
+ // Given a pool number, assign replica group which has the max number of existing instances.
+ // Repeat this process until the max number of replica groups per pool is reached.
+ while (!maxHeap.isEmpty()) {
+ Pairs.IntPair pair = maxHeap.remove();
+ int poolNumber = pair.getRight();
+ for (int i = 0; i < maxNumberOfReplicaGroupPerPool; i++) {
+ Set<Integer> existingReplicaGroups = existingPoolToExistingReplicaGroupIdsMap.get(poolNumber);
+ if (existingReplicaGroups == null || existingReplicaGroups.isEmpty()) {
+ continue;
+ }
+ int targetReplicaGroupId = -1;
+ int maxNumInstances = 0;
+ for (int existingReplicaGroupId : existingReplicaGroups) {
+ int numExistingInstances =
+ existingReplicaGroupIdToExistingInstancesMap.getOrDefault(existingReplicaGroupId, new HashSet<>())
+ .size();
+ if (numExistingInstances > maxNumInstances) {
+ maxNumInstances = numExistingInstances;
+ targetReplicaGroupId = existingReplicaGroupId;
}
}
+ // If target existing replica group cannot be found, it means it should be chosen from a new replica group.
+ if (targetReplicaGroupId > -1) {
+ poolToReplicaGroupIdsMap.computeIfAbsent(poolNumber, k -> new ArrayList<>()).add(targetReplicaGroupId);
+ replicaGroupIdToPoolMap.put(targetReplicaGroupId, poolNumber);
+ // Clear the stats so that the same replica group won't be picked up again in later iteration.
+ existingReplicaGroupIdToExistingInstancesMap.get(targetReplicaGroupId).clear();
+ }
}
}
- // Use a min heap to track the least frequently picked pool among all the pools
+
+ // If there is any new replica group added, choose pool which is least frequently picked up.
+ // Use a min heap to track the least frequently picked pool among all the pools.
PriorityQueue<Pairs.IntPair> minHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator());
for (int pool : pools) {
int numExistingReplicaGroups =
@@ -190,7 +237,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups);
- Map<Integer, Set<String>> replicaGroupIdToExistingInstancesMap = new TreeMap<>();
+ existingReplicaGroupIdToExistingInstancesMap = new TreeMap<>();
// Step 1: find out the replica groups and their existing instances,
// so that these instances can be filtered out and won't be chosen for the other replica group.
for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; replicaGroupId++) {
@@ -202,7 +249,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
- replicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>())
+ existingReplicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>())
.addAll(existingInstances);
}
}
@@ -215,7 +262,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
otherReplicaGroupId < existingNumReplicaGroups && otherReplicaGroupId < numReplicaGroups;
otherReplicaGroupId++) {
if (replicaGroupId != otherReplicaGroupId) {
- candidateInstances.removeAll(replicaGroupIdToExistingInstancesMap.get(otherReplicaGroupId));
+ candidateInstances.removeAll(existingReplicaGroupIdToExistingInstancesMap.get(otherReplicaGroupId));
}
}
LinkedHashSet<String> chosenCandidateInstances = new LinkedHashSet<>();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
index 2062a75209..940968432b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
@@ -22,16 +22,18 @@ import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeMap;
-import java.util.TreeSet;
import javax.annotation.Nullable;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.utils.config.InstanceUtils;
import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
+import org.apache.pinot.spi.utils.Pairs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,36 +125,38 @@ public class InstanceTagPoolSelector {
poolsToSelect = new ArrayList<>(numPoolsToSelect);
if (_minimizeDataMovement && _existingInstancePartitions != null) {
- Set<Integer> existingPools = new TreeSet<>();
+ Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new TreeMap<>();
// Keep the same pool if it's already been used for the table.
int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
- boolean foundExistingPoolForReplicaGroup = false;
- for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingPoolForReplicaGroup;
- partitionId++) {
+ for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
for (String existingInstance : existingInstances) {
Integer existingPool = instanceToPoolMap.get(existingInstance);
if (existingPool != null) {
- if (existingPools.add(existingPool)) {
- poolsToSelect.add(existingPool);
+ if (!existingPoolsToExistingInstancesMap.containsKey(existingPool)) {
+ existingPoolsToExistingInstancesMap.put(existingPool, new HashSet<>());
}
- foundExistingPoolForReplicaGroup = true;
- break;
+ existingPoolsToExistingInstancesMap.computeIfAbsent(existingPool, k -> new HashSet<>())
+ .add(existingInstance);
}
}
}
}
- LOGGER.info("Keep the same pool: {} for table: {}", existingPools, _tableNameWithType);
- // Pick a pool from remainingPools that isn't used before.
- List<Integer> remainingPools = new ArrayList<>(pools);
- remainingPools.removeAll(existingPools);
- // Select from the remaining pools.
- int remainingNumPoolsToSelect = numPoolsToSelect - poolsToSelect.size();
- for (int i = 0; i < remainingNumPoolsToSelect; i++) {
- poolsToSelect.add(remainingPools.remove(i % remainingPools.size()));
+
+ // Use a max heap to track the number of servers used for all the pools.
+ PriorityQueue<Pairs.IntPair> maxHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator(false));
+ for (int pool : pools) {
+ maxHeap.add(new Pairs.IntPair(existingPoolsToExistingInstancesMap.get(pool).size(), pool));
+ }
+
+ // Pick the pools from the max heap, so that data movement be minimized.
+ for (int i = 0; i < numPoolsToSelect; i++) {
+ Pairs.IntPair pair = maxHeap.remove();
+ poolsToSelect.add(pair.getRight());
}
+ LOGGER.info("The selected pools: " + poolsToSelect);
} else {
// Select pools based on the table name hash to evenly distribute the tables
List<Integer> poolsInCluster = new ArrayList<>(pools);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
index 2fdef27796..fdb6292f26 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
@@ -34,6 +34,8 @@ import org.testng.annotations.Test;
public class InstanceReplicaGroupPartitionSelectorTest {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
private static final String INSTANCE_CONFIG_TEMPLATE =
"{\n" + " \"id\": \"Server_pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+ " \"simpleFields\": {\n" + " \"HELIX_ENABLED\": \"true\",\n"
@@ -51,15 +53,15 @@ public class InstanceReplicaGroupPartitionSelectorTest {
+ " ]\n" + " }\n" + "}";
@Test
- public void testSelectInstances() throws JsonProcessingException {
- ObjectMapper objectMapper = new ObjectMapper();
+ public void testPoolsWhenOneMorePoolAddedAndOneMoreReplicaGroupsNeeded()
+ throws JsonProcessingException {
String existingPartitionsJson =
" {\n" + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
+ " \"partitionToInstancesMap\": {\n" + " \"0_0\": [\n"
+ " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+ " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+ " ]\n" + " }\n" + " }\n";
- InstancePartitions existing = objectMapper.readValue(existingPartitionsJson, InstancePartitions.class);
+ InstancePartitions existing = OBJECT_MAPPER.readValue(existingPartitionsJson, InstancePartitions.class);
InstanceReplicaGroupPartitionConfig config =
new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, null);
@@ -68,8 +70,10 @@ public class InstanceReplicaGroupPartitionSelectorTest {
String[] serverNames = {"rg0-0", "rg0-1", "rg1-0", "rg1-1"};
String[] poolNumbers = {"0", "0", "1", "1"};
- String[] poolNames = {"FirstHalfReplicationGroups", "FirstHalfReplicationGroups", "SecondHalfReplicationGroups",
- "SecondHalfReplicationGroups"};
+ String[] poolNames = {
+ "FirstHalfReplicationGroups", "FirstHalfReplicationGroups", "SecondHalfReplicationGroups",
+ "SecondHalfReplicationGroups"
+ };
Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = new HashMap<>();
for (int i = 0; i < serverNames.length; i++) {
@@ -81,13 +85,15 @@ public class InstanceReplicaGroupPartitionSelectorTest {
StringSubstitutor substitutor = new StringSubstitutor(valuesMap);
String resolvedString = substitutor.replace(INSTANCE_CONFIG_TEMPLATE);
- ZNRecord znRecord = objectMapper.readValue(resolvedString, ZNRecord.class);
+ ZNRecord znRecord = OBJECT_MAPPER.readValue(resolvedString, ZNRecord.class);
int poolNumber = Integer.parseInt(poolNumbers[i]);
poolToInstanceConfigsMap.computeIfAbsent(poolNumber, k -> new ArrayList<>()).add(new InstanceConfig(znRecord));
}
InstancePartitions assignedPartitions = new InstancePartitions("0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE");
selector.selectInstances(poolToInstanceConfigsMap, assignedPartitions);
+ // Now that 1 more pool is added and 1 more RG is needed, a new set called "0_1" is generated,
+ // and the instances from Pool 1 are assigned to this new replica.
String expectedInstancePartitions =
" {\n" + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
+ " \"partitionToInstancesMap\": {\n" + " \"0_0\": [\n"
@@ -98,7 +104,63 @@ public class InstanceReplicaGroupPartitionSelectorTest {
+ " \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+ " ]\n" + " }\n" + " }\n";
InstancePartitions expectedPartitions =
- objectMapper.readValue(expectedInstancePartitions, InstancePartitions.class);
+ OBJECT_MAPPER.readValue(expectedInstancePartitions, InstancePartitions.class);
+ assert assignedPartitions.equals(expectedPartitions);
+ }
+
+ @Test
+ public void testSelectPoolsWhenExistingReplicaGroupMapsToMultiplePools()
+ throws JsonProcessingException {
+ // The "rg0-2" instance used to belong to Pool 1, but now it belongs to Pool 0.
+ String existingPartitionsJson =
+ " {\n" + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
+ + " \"partitionToInstancesMap\": {\n" + " \"0_0\": [\n"
+ + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+ + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+ + " ],\n" + " \"0_1\": [\n"
+ + " \"Server_pinot-server-rg0-2.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+ + " \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+ + " ]\n" + " }\n" + " }\n";
+ InstancePartitions existing = OBJECT_MAPPER.readValue(existingPartitionsJson, InstancePartitions.class);
+ InstanceReplicaGroupPartitionConfig config =
+ new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, null);
+
+ InstanceReplicaGroupPartitionSelector selector =
+ new InstanceReplicaGroupPartitionSelector(config, "tableNameBlah", existing, true);
+
+ String[] serverNames = {"rg0-0", "rg0-1", "rg0-2", "rg1-0", "rg1-1", "rg1-2"};
+ String[] poolNumbers = {"0", "0", "0", "1", "1", "1"};
+ Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = new HashMap<>();
+
+ for (int i = 0; i < serverNames.length; i++) {
+ Map<String, String> valuesMap = new HashMap<>();
+ valuesMap.put("serverName", serverNames[i]);
+ valuesMap.put("pool", poolNumbers[i]);
+
+ StringSubstitutor substitutor = new StringSubstitutor(valuesMap);
+ String resolvedString = substitutor.replace(INSTANCE_CONFIG_TEMPLATE);
+
+ ZNRecord znRecord = OBJECT_MAPPER.readValue(resolvedString, ZNRecord.class);
+ int poolNumber = Integer.parseInt(poolNumbers[i]);
+ poolToInstanceConfigsMap.computeIfAbsent(poolNumber, k -> new ArrayList<>()).add(new InstanceConfig(znRecord));
+ }
+
+ InstancePartitions assignedPartitions = new InstancePartitions("0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE");
+ selector.selectInstances(poolToInstanceConfigsMap, assignedPartitions);
+
+ // The "rg0-2" instance is replaced by "rg1-0" (which belongs to Pool 1), as "rg0-2" no longer belongs to Pool 1.
+ // And "rg1-0" remains the same position as it's always under Pool 1.
+ String expectedInstancePartitions =
+ " {\n" + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
+ + " \"partitionToInstancesMap\": {\n" + " \"0_0\": [\n"
+ + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+ + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+ + " ],\n" + " \"0_1\": [\n"
+ + " \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+ + " \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+ + " ]\n" + " }\n" + " }\n";
+ InstancePartitions expectedPartitions =
+ OBJECT_MAPPER.readValue(expectedInstancePartitions, InstancePartitions.class);
assert assignedPartitions.equals(expectedPartitions);
}
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java
index be18d35e50..45645387af 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java
@@ -30,7 +30,11 @@ public class Pairs {
}
public static Comparator<IntPair> intPairComparator() {
- return new AscendingIntPairComparator();
+ return new AscendingIntPairComparator(true);
+ }
+
+ public static Comparator<IntPair> intPairComparator(boolean ascending) {
+ return new AscendingIntPairComparator(ascending);
}
public static class IntPair {
@@ -79,13 +83,26 @@ public class Pairs {
}
public static class AscendingIntPairComparator implements Comparator<IntPair> {
+ private boolean _ascending;
+
+ public AscendingIntPairComparator(boolean ascending) {
+ _ascending = ascending;
+ }
@Override
public int compare(IntPair pair1, IntPair pair2) {
if (pair1._left != pair2._left) {
- return Integer.compare(pair1._left, pair2._left);
+ if (_ascending) {
+ return Integer.compare(pair1._left, pair2._left);
+ } else {
+ return Integer.compare(pair2._left, pair1._left);
+ }
} else {
- return Integer.compare(pair1._right, pair2._right);
+ if (_ascending) {
+ return Integer.compare(pair1._right, pair2._right);
+ } else {
+ return Integer.compare(pair2._right, pair1._right);
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org