You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2022/02/22 19:31:53 UTC
[helix] 02/04: Introduce VirtualTopologyGroup and its assignment logic with benchmark. (#1948)
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 199d997a18f7ca3f911a0a8cc219dae5009d5aa2
Author: Qi (Quincy) Qu <qq...@linkedin.com>
AuthorDate: Thu Feb 3 12:18:46 2022 -0800
Introduce VirtualTopologyGroup and its assignment logic with benchmark. (#1948)
* Cleanup unused assignment schemes and minor change.
* Further refactor and code cleanup.
---
.../constants/VirtualTopologyGroupConstants.java | 29 +++++++
.../FifoVirtualGroupAssignmentAlgorithm.java | 79 ++++++++++++++++++
.../topology/VirtualGroupAssignmentAlgorithm.java | 38 +++++++++
.../main/java/org/apache/helix/util/HelixUtil.java | 14 ++++
.../TestVirtualTopologyGroupAssignment.java | 94 ++++++++++++++++++++++
.../rebalancer/waged/model/TestAssignableNode.java | 16 ++++
6 files changed, 270 insertions(+)
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/constants/VirtualTopologyGroupConstants.java b/helix-core/src/main/java/org/apache/helix/cloud/constants/VirtualTopologyGroupConstants.java
new file mode 100644
index 0000000..a92e195
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/cloud/constants/VirtualTopologyGroupConstants.java
@@ -0,0 +1,29 @@
+package org.apache.helix.cloud.constants;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+public class VirtualTopologyGroupConstants {
+ public static final String GROUP_NAME = "virtualTopologyGroupName";
+ public static final String GROUP_NUMBER = "virtualTopologyGroupNumber";
+ public static final String GROUP_NAME_SPLITTER = "_";
+ public static final String PATH_NAME_SPLITTER = "/";
+ public static final String VIRTUAL_FAULT_ZONE_TYPE = "virtualZone";
+}
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/topology/FifoVirtualGroupAssignmentAlgorithm.java b/helix-core/src/main/java/org/apache/helix/cloud/topology/FifoVirtualGroupAssignmentAlgorithm.java
new file mode 100644
index 0000000..23da847
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/cloud/topology/FifoVirtualGroupAssignmentAlgorithm.java
@@ -0,0 +1,79 @@
+package org.apache.helix.cloud.topology;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.cloud.constants.VirtualTopologyGroupConstants;
+import org.apache.helix.util.HelixUtil;
+
+
+/**
+ * A strategy that densely assign virtual groups with input instance list, it doesn't move to the next one until
+ * the current one is filled.
+ * Given that instances.size = instancesPerGroup * numGroups + residuals,
+ * we break [residuals] into the first few groups, as a result each virtual group will have
+ * either [instancesPerGroup] or [instancesPerGroup + 1] instances.
+ */
+public class FifoVirtualGroupAssignmentAlgorithm implements VirtualGroupAssignmentAlgorithm {
+ private static final FifoVirtualGroupAssignmentAlgorithm _instance = new FifoVirtualGroupAssignmentAlgorithm();
+
+ private FifoVirtualGroupAssignmentAlgorithm() { }
+
+ public static FifoVirtualGroupAssignmentAlgorithm getInstance() {
+ return _instance;
+ }
+
+ @Override
+ public Map<String, Set<String>> computeAssignment(int numGroups, String virtualGroupName,
+ Map<String, Set<String>> zoneMapping) {
+ List<String> sortedInstances = HelixUtil.sortAndFlattenZoneMapping(zoneMapping);
+ Map<String, Set<String>> assignment = new HashMap<>();
+ // #instances = instancesPerGroupBase * numGroups + residuals
+ int instancesPerGroupBase = sortedInstances.size() / numGroups;
+ int residuals = sortedInstances.size() % numGroups; // assign across the first #residuals groups
+ List<Integer> numInstances = new ArrayList<>();
+ int instanceInd = 0;
+ for (int groupInd = 0; groupInd < numGroups; groupInd++) {
+ int num = groupInd < residuals
+ ? instancesPerGroupBase + 1
+ : instancesPerGroupBase;
+ String groupId = computeVirtualGroupId(groupInd, virtualGroupName);
+ assignment.put(groupId, new HashSet<>());
+ for (int i = 0; i < num; i++) {
+ assignment.get(groupId).add(sortedInstances.get(instanceInd));
+ instanceInd++;
+ }
+ numInstances.add(num);
+ }
+ Preconditions.checkState(numInstances.stream().mapToInt(Integer::intValue).sum() == sortedInstances.size());
+ return ImmutableMap.copyOf(assignment);
+ }
+
+ private static String computeVirtualGroupId(int groupIndex, String virtualGroupName) {
+ return virtualGroupName + VirtualTopologyGroupConstants.GROUP_NAME_SPLITTER + groupIndex;
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/topology/VirtualGroupAssignmentAlgorithm.java b/helix-core/src/main/java/org/apache/helix/cloud/topology/VirtualGroupAssignmentAlgorithm.java
new file mode 100644
index 0000000..8d6c97f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/cloud/topology/VirtualGroupAssignmentAlgorithm.java
@@ -0,0 +1,38 @@
+package org.apache.helix.cloud.topology;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import java.util.Set;
+
+
+public interface VirtualGroupAssignmentAlgorithm {
+
+ /**
+ * Compute the assignment for each virtual topology group.
+ *
+ * @param numGroups number of the virtual groups
+ * @param virtualGroupName virtual group name
+ * @param zoneMapping current zone mapping from zoneId to instanceIds
+ * @return the assignment as mapping from virtual group ID to instanceIds
+ */
+ Map<String, Set<String>> computeAssignment(int numGroups, String virtualGroupName,
+ Map<String, Set<String>> zoneMapping);
+}
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 75ca302..ee31e43 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -580,4 +580,18 @@ public final class HelixUtil {
|| (instancesMessages.getOrDefault(instance, Collections.emptyList()).stream()
.anyMatch(Message::isParticipantStatusChangeType));
}
+
+ /**
+ * Sort zoneMapping for each virtual group and flatten to a list.
+ * @param zoneMapping virtual group mapping.
+ * @return a list of instances sorted and flattened.
+ */
+ public static List<String> sortAndFlattenZoneMapping(Map<String, Set<String>> zoneMapping) {
+ return zoneMapping
+ .entrySet()
+ .stream()
+ .sorted(Map.Entry.comparingByKey())
+ .flatMap(entry -> entry.getValue().stream().sorted())
+ .collect(Collectors.toList());
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestVirtualTopologyGroupAssignment.java b/helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestVirtualTopologyGroupAssignment.java
new file mode 100644
index 0000000..54f4365
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestVirtualTopologyGroupAssignment.java
@@ -0,0 +1,94 @@
+package org.apache.helix.cloud.virtualTopologyGroup;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Sets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.cloud.constants.VirtualTopologyGroupConstants;
+import org.apache.helix.cloud.topology.FifoVirtualGroupAssignmentAlgorithm;
+import org.apache.helix.cloud.topology.VirtualGroupAssignmentAlgorithm;
+import org.apache.helix.util.HelixUtil;
+import org.testng.Assert;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class TestVirtualTopologyGroupAssignment {
+
+ private static final String GROUP_NAME = "test_virtual_group";
+ private final List<String> _flattenExpected = Arrays.asList(
+ "1", "2", "3",
+ "4", "5", "6",
+ "7", "8", "9",
+ "a", "b", "c", "d");
+ private Map<String, Set<String>> _zoneMapping = new HashMap<>();
+
+ @BeforeTest
+ public void prepare() {
+ _zoneMapping = new HashMap<>();
+ _zoneMapping.put("c", Sets.newHashSet("9", "8", "7"));
+ _zoneMapping.put("a", Sets.newHashSet("2", "3", "1"));
+ _zoneMapping.put("z", Sets.newHashSet("b", "c", "d", "a"));
+ _zoneMapping.put("b", Sets.newHashSet("5", "4", "6"));
+ }
+
+ @Test
+ public void testFlattenZoneMapping() {
+ Assert.assertEquals(HelixUtil.sortAndFlattenZoneMapping(_zoneMapping), _flattenExpected);
+ }
+
+ @Test(dataProvider = "getMappingTests")
+ public void testAssignmentScheme(int numGroups, Map<String, Set<String>> expected,
+ VirtualGroupAssignmentAlgorithm algorithm) {
+ Assert.assertEquals(algorithm.computeAssignment(numGroups, GROUP_NAME, _zoneMapping), expected);
+ }
+
+ @DataProvider
+ public Object[][] getMappingTests() {
+ Map<String, Set<String>> virtualMapping = new HashMap<>();
+ VirtualGroupAssignmentAlgorithm algorithm = FifoVirtualGroupAssignmentAlgorithm.getInstance();
+ virtualMapping.put(computeVirtualGroupId(0), Sets.newHashSet("1", "2", "3", "4", "5"));
+ virtualMapping.put(computeVirtualGroupId(1), Sets.newHashSet("6", "7", "8", "9"));
+ virtualMapping.put(computeVirtualGroupId(2), Sets.newHashSet("a", "b", "c", "d"));
+ Assert.assertEquals(algorithm.computeAssignment(3, GROUP_NAME, _zoneMapping),
+ virtualMapping);
+ Map<String, Set<String>> virtualMapping2 = new HashMap<>();
+ virtualMapping2.put(computeVirtualGroupId(0), Sets.newHashSet("1", "2"));
+ virtualMapping2.put(computeVirtualGroupId(1), Sets.newHashSet("3", "4"));
+ virtualMapping2.put(computeVirtualGroupId(2), Sets.newHashSet("5", "6"));
+ virtualMapping2.put(computeVirtualGroupId(3), Sets.newHashSet("7", "8"));
+ virtualMapping2.put(computeVirtualGroupId(4), Sets.newHashSet("9", "a"));
+ virtualMapping2.put(computeVirtualGroupId(5), Sets.newHashSet("b"));
+ virtualMapping2.put(computeVirtualGroupId(6), Sets.newHashSet("c"));
+ virtualMapping2.put(computeVirtualGroupId(7), Sets.newHashSet("d"));
+ return new Object[][] {
+ {3, virtualMapping, algorithm},
+ {8, virtualMapping2, algorithm}
+ };
+ }
+
+ private static String computeVirtualGroupId(int groupIndex) {
+ return GROUP_NAME + VirtualTopologyGroupConstants.GROUP_NAME_SPLITTER + groupIndex;
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index 4570efd..9cbfc25 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -257,6 +257,22 @@ public class TestAssignableNode extends AbstractTestClusterModel {
testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
Assert.assertEquals(assignableNode.getFaultZone(), "2/testInstance");
+
+ // test fault zone not in top of topology
+ testClusterConfig = new ClusterConfig("testClusterConfigId");
+ testClusterConfig.setFaultZoneType("zone");
+ testClusterConfig.setTopologyAwareEnabled(true);
+ testClusterConfig.setTopology("/rack/zone/instance");
+
+ testInstanceConfig = new InstanceConfig("testInstanceConfigId");
+ testInstanceConfig.setDomain("rack=3, zone=2, instance=testInstanceConfigId");
+ instanceConfigMap = new HashMap<>();
+ instanceConfigMap.put(_testInstanceId, testInstanceConfig);
+ when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+ when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
+ assignableNode = new AssignableNode(testCache.getClusterConfig(),
+ testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
+ Assert.assertEquals(assignableNode.getFaultZone(), "3/2");
}
@Test