You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2023/11/03 22:54:13 UTC

(pinot) 01/01: Fix the NPE in minimizeDataMovement instance assignment strategy

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch fix-npe-for-minimizeDataMovement
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit d9832a4ac443f7cd4fed1f3fa4db55f49507a86a
Author: jlli_LinkedIn <jl...@linkedin.com>
AuthorDate: Fri Nov 3 15:48:31 2023 -0700

    Fix the NPE in minimizeDataMovement instance assignment strategy
---
 .../InstanceReplicaGroupPartitionSelector.java     |  11 ++-
 .../InstanceReplicaGroupPartitionSelectorTest.java | 105 +++++++++++++++++++++
 2 files changed, 111 insertions(+), 5 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
index b6c62ac12e..de1e681d17 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
@@ -72,11 +72,17 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
       Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups must be positive");
       Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
       Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
+      Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
       for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
         // Pick one pool for each replica-group based on the table name hash
         int pool = pools.get((tableNameHash + replicaId) % numPools);
         poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
         replicaGroupIdToPoolMap.put(replicaId, pool);
+
+        Set<String> candidateInstances =
+            poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
+        List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool);
+        instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName()));
       }
       LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap,
           _tableNameWithType);
@@ -132,7 +138,6 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
         int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
         int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups);
 
-        Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
         Map<Integer, Set<String>> replicaGroupIdToExistingInstancesMap = new TreeMap<>();
         // Step 1: find out the replica groups and their existing instances,
         //   so that these instances can be filtered out and won't be chosen for the other replica group.
@@ -142,10 +147,6 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
             // Skip the replica group if it's no longer needed.
             continue;
           }
-          Set<String> candidateInstances =
-              poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
-          List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool);
-          instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName()));
 
           for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
             List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
new file mode 100644
index 0000000000..4f982c45e6
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.instance;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.text.StringSubstitutor;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class InstanceReplicaGroupPartitionSelectorTest {
+
+  private static final String instanceConfigTemplate =
+      "{\n" + "  \"id\": \"Server_pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+          + "  \"simpleFields\": {\n" + "    \"HELIX_ENABLED\": \"true\",\n"
+          + "    \"HELIX_ENABLED_TIMESTAMP\": \"1688959934305\",\n"
+          + "    \"HELIX_HOST\": \"pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local\",\n"
+          + "    \"HELIX_PORT\": \"8098\",\n" + "    \"adminPort\": \"8097\",\n" + "    \"grpcPort\": \"8090\",\n"
+          + "    \"queryMailboxPort\": \"46347\",\n" + "    \"queryServerPort\": \"45031\",\n"
+          + "    \"shutdownInProgress\": \"false\"\n" + "  },\n" + "  \"mapFields\": {\n"
+          + "    \"SYSTEM_RESOURCE_INFO\": {\n" + "      \"numCores\": \"16\",\n"
+          + "      \"totalMemoryMB\": \"126976\",\n" + "      \"maxHeapSizeMB\": \"65536\"\n" + "    },\n"
+          + "    \"pool\": {\n" + "      \"DefaultTenant_OFFLINE\": \"${pool}\",\n"
+          + "      \"${poolName}\": \"${pool}\",\n" + "      \"AllReplicationGroups\": \"1\"\n" + "    }\n" + "  },\n"
+          + "  \"listFields\": {\n" + "    \"TAG_LIST\": [\n" + "      \"DefaultTenant_OFFLINE\",\n"
+          + "      \"DefaultTenant_REALTIME\",\n" + "      \"${poolName}\",\n" + "      \"AllReplicationGroups\"\n"
+          + "    ]\n" + "  }\n" + "}";
+
+  @Test
+  public void testSelectInstances() throws JsonProcessingException {
+    ObjectMapper objectMapper = new ObjectMapper();
+    String existingPartitionsJson =
+        "    {\n" + "      \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
+            + "      \"partitionToInstancesMap\": {\n" + "        \"0_0\": [\n"
+            + "          \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+            + "          \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+            + "        ]\n" + "      }\n" + "    }\n";
+    InstancePartitions existing = objectMapper.readValue(existingPartitionsJson, InstancePartitions.class);
+    InstanceReplicaGroupPartitionConfig config =
+        new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, null);
+
+    InstanceReplicaGroupPartitionSelector selector =
+        new InstanceReplicaGroupPartitionSelector(config, "tableNameBlah", existing);
+
+    String[] serverNames = {"rg0-0", "rg0-1", "rg1-0", "rg1-1"};
+    String[] poolNumbers = {"0", "0", "1", "1"};
+    String[] poolNames = {"FirstHalfReplicationGroups", "FirstHalfReplicationGroups", "SecondHalfReplicationGroups",
+        "SecondHalfReplicationGroups"};
+    Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = new HashMap<>();
+
+    for (int i = 0; i < serverNames.length; i++) {
+      Map<String, String> valuesMap = new HashMap<>();
+      valuesMap.put("serverName", serverNames[i]);
+      valuesMap.put("pool", poolNumbers[i]);
+      valuesMap.put("poolName", poolNames[i]);
+
+      StringSubstitutor substitutor = new StringSubstitutor(valuesMap);
+      String resolvedString = substitutor.replace(instanceConfigTemplate);
+
+      ZNRecord znRecord = objectMapper.readValue(resolvedString, ZNRecord.class);
+      int poolNumber = Integer.parseInt(poolNumbers[i]);
+      poolToInstanceConfigsMap.computeIfAbsent(poolNumber, k -> new ArrayList<>()).add(new InstanceConfig(znRecord));
+    }
+    InstancePartitions assignedPartitions = new InstancePartitions("0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE");
+    selector.selectInstances(poolToInstanceConfigsMap, assignedPartitions);
+
+    String expectedInstancePartitions =
+        "    {\n" + "      \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
+            + "      \"partitionToInstancesMap\": {\n" + "        \"0_0\": [\n"
+            + "          \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+            + "          \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+            + "        ],\n" + "        \"0_1\": [\n"
+            + "          \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+            + "          \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+            + "        ]\n" + "      }\n" + "  }\n";
+    InstancePartitions expectedPartitions =
+        objectMapper.readValue(expectedInstancePartitions, InstancePartitions.class);
+    assert assignedPartitions.equals(expectedPartitions);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org