You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2022/02/22 19:31:53 UTC

[helix] 02/04: Introduce VirtualTopologyGroup and its assignment logic with benchmark. (#1948)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 199d997a18f7ca3f911a0a8cc219dae5009d5aa2
Author: Qi (Quincy) Qu <qq...@linkedin.com>
AuthorDate: Thu Feb 3 12:18:46 2022 -0800

    Introduce VirtualTopologyGroup and its assignment logic with benchmark. (#1948)
    
    
    
    * Cleanup unused assignment schemes and minor change.
    
    * Further refactor and code cleanup.
---
 .../constants/VirtualTopologyGroupConstants.java   | 29 +++++++
 .../FifoVirtualGroupAssignmentAlgorithm.java       | 79 ++++++++++++++++++
 .../topology/VirtualGroupAssignmentAlgorithm.java  | 38 +++++++++
 .../main/java/org/apache/helix/util/HelixUtil.java | 14 ++++
 .../TestVirtualTopologyGroupAssignment.java        | 94 ++++++++++++++++++++++
 .../rebalancer/waged/model/TestAssignableNode.java | 16 ++++
 6 files changed, 270 insertions(+)

diff --git a/helix-core/src/main/java/org/apache/helix/cloud/constants/VirtualTopologyGroupConstants.java b/helix-core/src/main/java/org/apache/helix/cloud/constants/VirtualTopologyGroupConstants.java
new file mode 100644
index 0000000..a92e195
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/cloud/constants/VirtualTopologyGroupConstants.java
@@ -0,0 +1,29 @@
+package org.apache.helix.cloud.constants;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+public class VirtualTopologyGroupConstants {
+  public static final String GROUP_NAME = "virtualTopologyGroupName";
+  public static final String GROUP_NUMBER = "virtualTopologyGroupNumber";
+  public static final String GROUP_NAME_SPLITTER = "_";
+  public static final String PATH_NAME_SPLITTER = "/";
+  public static final String VIRTUAL_FAULT_ZONE_TYPE = "virtualZone";
+}
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/topology/FifoVirtualGroupAssignmentAlgorithm.java b/helix-core/src/main/java/org/apache/helix/cloud/topology/FifoVirtualGroupAssignmentAlgorithm.java
new file mode 100644
index 0000000..23da847
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/cloud/topology/FifoVirtualGroupAssignmentAlgorithm.java
@@ -0,0 +1,79 @@
+package org.apache.helix.cloud.topology;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.cloud.constants.VirtualTopologyGroupConstants;
+import org.apache.helix.util.HelixUtil;
+
+
+/**
+ * A strategy that densely assign virtual groups with input instance list, it doesn't move to the next one until
+ * the current one is filled.
+ * Given that instances.size = instancesPerGroup * numGroups + residuals,
+ * we break [residuals] into the first few groups, as a result each virtual group will have
+ * either [instancesPerGroup] or [instancesPerGroup + 1] instances.
+ */
+public class FifoVirtualGroupAssignmentAlgorithm implements VirtualGroupAssignmentAlgorithm {
+  private static final FifoVirtualGroupAssignmentAlgorithm _instance = new FifoVirtualGroupAssignmentAlgorithm();
+
+  private FifoVirtualGroupAssignmentAlgorithm() { }
+
+  public static FifoVirtualGroupAssignmentAlgorithm getInstance() {
+    return _instance;
+  }
+
+  @Override
+  public Map<String, Set<String>> computeAssignment(int numGroups, String virtualGroupName,
+      Map<String, Set<String>> zoneMapping) {
+    List<String> sortedInstances = HelixUtil.sortAndFlattenZoneMapping(zoneMapping);
+    Map<String, Set<String>> assignment = new HashMap<>();
+    // #instances = instancesPerGroupBase * numGroups + residuals
+    int instancesPerGroupBase = sortedInstances.size() / numGroups;
+    int residuals = sortedInstances.size() % numGroups; // assign across the first #residuals groups
+    List<Integer> numInstances = new ArrayList<>();
+    int instanceInd = 0;
+    for (int groupInd = 0; groupInd < numGroups; groupInd++) {
+      int num = groupInd < residuals
+          ? instancesPerGroupBase + 1
+          : instancesPerGroupBase;
+      String groupId = computeVirtualGroupId(groupInd, virtualGroupName);
+      assignment.put(groupId, new HashSet<>());
+      for (int i = 0; i < num; i++) {
+        assignment.get(groupId).add(sortedInstances.get(instanceInd));
+        instanceInd++;
+      }
+      numInstances.add(num);
+    }
+    Preconditions.checkState(numInstances.stream().mapToInt(Integer::intValue).sum() == sortedInstances.size());
+    return ImmutableMap.copyOf(assignment);
+  }
+
+  private static String computeVirtualGroupId(int groupIndex, String virtualGroupName) {
+    return virtualGroupName + VirtualTopologyGroupConstants.GROUP_NAME_SPLITTER + groupIndex;
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/topology/VirtualGroupAssignmentAlgorithm.java b/helix-core/src/main/java/org/apache/helix/cloud/topology/VirtualGroupAssignmentAlgorithm.java
new file mode 100644
index 0000000..8d6c97f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/cloud/topology/VirtualGroupAssignmentAlgorithm.java
@@ -0,0 +1,38 @@
+package org.apache.helix.cloud.topology;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import java.util.Set;
+
+
+public interface VirtualGroupAssignmentAlgorithm {
+
+  /**
+   * Compute the assignment for each virtual topology group.
+   *
+   * @param numGroups number of the virtual groups
+   * @param virtualGroupName virtual group name
+   * @param zoneMapping current zone mapping from zoneId to instanceIds
+   * @return the assignment as mapping from virtual group ID to instanceIds
+   */
+  Map<String, Set<String>> computeAssignment(int numGroups, String virtualGroupName,
+      Map<String, Set<String>> zoneMapping);
+}
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 75ca302..ee31e43 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -580,4 +580,18 @@ public final class HelixUtil {
         || (instancesMessages.getOrDefault(instance, Collections.emptyList()).stream()
         .anyMatch(Message::isParticipantStatusChangeType));
   }
+
+  /**
+   * Sort zoneMapping for each virtual group and flatten to a list.
+   * @param zoneMapping virtual group mapping.
+   * @return a list of instances sorted and flattened.
+   */
+  public static List<String> sortAndFlattenZoneMapping(Map<String, Set<String>> zoneMapping) {
+    return zoneMapping
+        .entrySet()
+        .stream()
+        .sorted(Map.Entry.comparingByKey())
+        .flatMap(entry -> entry.getValue().stream().sorted())
+        .collect(Collectors.toList());
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestVirtualTopologyGroupAssignment.java b/helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestVirtualTopologyGroupAssignment.java
new file mode 100644
index 0000000..54f4365
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestVirtualTopologyGroupAssignment.java
@@ -0,0 +1,94 @@
+package org.apache.helix.cloud.virtualTopologyGroup;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Sets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.cloud.constants.VirtualTopologyGroupConstants;
+import org.apache.helix.cloud.topology.FifoVirtualGroupAssignmentAlgorithm;
+import org.apache.helix.cloud.topology.VirtualGroupAssignmentAlgorithm;
+import org.apache.helix.util.HelixUtil;
+import org.testng.Assert;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class TestVirtualTopologyGroupAssignment {
+
+  private static final String GROUP_NAME = "test_virtual_group";
+  private final List<String> _flattenExpected = Arrays.asList(
+      "1", "2", "3",
+      "4", "5", "6",
+      "7", "8", "9",
+      "a", "b", "c", "d");
+  private Map<String, Set<String>> _zoneMapping = new HashMap<>();
+
+  @BeforeTest
+  public void prepare() {
+    _zoneMapping = new HashMap<>();
+    _zoneMapping.put("c", Sets.newHashSet("9", "8", "7"));
+    _zoneMapping.put("a", Sets.newHashSet("2", "3", "1"));
+    _zoneMapping.put("z", Sets.newHashSet("b", "c", "d", "a"));
+    _zoneMapping.put("b", Sets.newHashSet("5", "4", "6"));
+  }
+
+  @Test
+  public void testFlattenZoneMapping() {
+    Assert.assertEquals(HelixUtil.sortAndFlattenZoneMapping(_zoneMapping), _flattenExpected);
+  }
+
+  @Test(dataProvider = "getMappingTests")
+  public void testAssignmentScheme(int numGroups, Map<String, Set<String>> expected,
+      VirtualGroupAssignmentAlgorithm algorithm) {
+    Assert.assertEquals(algorithm.computeAssignment(numGroups, GROUP_NAME, _zoneMapping), expected);
+  }
+
+  @DataProvider
+  public Object[][] getMappingTests() {
+    Map<String, Set<String>> virtualMapping = new HashMap<>();
+    VirtualGroupAssignmentAlgorithm algorithm = FifoVirtualGroupAssignmentAlgorithm.getInstance();
+    virtualMapping.put(computeVirtualGroupId(0), Sets.newHashSet("1", "2", "3", "4", "5"));
+    virtualMapping.put(computeVirtualGroupId(1), Sets.newHashSet("6", "7", "8", "9"));
+    virtualMapping.put(computeVirtualGroupId(2), Sets.newHashSet("a", "b", "c", "d"));
+    Assert.assertEquals(algorithm.computeAssignment(3, GROUP_NAME, _zoneMapping),
+        virtualMapping);
+    Map<String, Set<String>> virtualMapping2 = new HashMap<>();
+    virtualMapping2.put(computeVirtualGroupId(0), Sets.newHashSet("1", "2"));
+    virtualMapping2.put(computeVirtualGroupId(1), Sets.newHashSet("3", "4"));
+    virtualMapping2.put(computeVirtualGroupId(2), Sets.newHashSet("5", "6"));
+    virtualMapping2.put(computeVirtualGroupId(3), Sets.newHashSet("7", "8"));
+    virtualMapping2.put(computeVirtualGroupId(4), Sets.newHashSet("9", "a"));
+    virtualMapping2.put(computeVirtualGroupId(5), Sets.newHashSet("b"));
+    virtualMapping2.put(computeVirtualGroupId(6), Sets.newHashSet("c"));
+    virtualMapping2.put(computeVirtualGroupId(7), Sets.newHashSet("d"));
+    return new Object[][] {
+        {3, virtualMapping, algorithm},
+        {8, virtualMapping2, algorithm}
+    };
+  }
+
+  private static String computeVirtualGroupId(int groupIndex) {
+    return GROUP_NAME + VirtualTopologyGroupConstants.GROUP_NAME_SPLITTER + groupIndex;
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index 4570efd..9cbfc25 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -257,6 +257,22 @@ public class TestAssignableNode extends AbstractTestClusterModel {
         testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
 
     Assert.assertEquals(assignableNode.getFaultZone(), "2/testInstance");
+
+    // test fault zone not in top of topology
+    testClusterConfig = new ClusterConfig("testClusterConfigId");
+    testClusterConfig.setFaultZoneType("zone");
+    testClusterConfig.setTopologyAwareEnabled(true);
+    testClusterConfig.setTopology("/rack/zone/instance");
+
+    testInstanceConfig = new InstanceConfig("testInstanceConfigId");
+    testInstanceConfig.setDomain("rack=3, zone=2, instance=testInstanceConfigId");
+    instanceConfigMap = new HashMap<>();
+    instanceConfigMap.put(_testInstanceId, testInstanceConfig);
+    when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+    when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
+    assignableNode = new AssignableNode(testCache.getClusterConfig(),
+        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
+    Assert.assertEquals(assignableNode.getFaultZone(), "3/2");
   }
 
   @Test