You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2022/02/22 19:31:51 UTC

[helix] branch master updated (db99f56 -> de572a6)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git.


    from db99f56  Use final remaining capacity when computing weighted score (#1961)
     new 47ee384  Fix #1946 -- Refactor and move ClusterTopologyConfig
     new 199d997  Introduce VirtualTopologyGroup and its assignment logic with benchmark. (#1948)
     new 90a3832  Implement java API and utils for virtual topology group (#1935)
     new de572a6  Add rest endpoint for virtual topology group (#1958)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 ...der.java => VirtualTopologyGroupConstants.java} |  11 +-
 .../FifoVirtualGroupAssignmentAlgorithm.java       |  79 +++++++++
 .../topology/VirtualGroupAssignmentAlgorithm.java} |  26 +--
 .../controller/rebalancer/topology/Topology.java   | 120 ++++---------
 .../rebalancer/waged/model/AssignableNode.java     |   1 -
 .../apache/helix/model/ClusterTopologyConfig.java  | 101 +++++++++++
 .../org/apache/helix/model/InstanceConfig.java     |  21 ++-
 .../main/java/org/apache/helix/util/HelixUtil.java |  14 ++
 .../TestVirtualTopologyGroupAssignment.java        |  94 ++++++++++
 .../rebalancer/waged/model/TestAssignableNode.java |  16 ++
 .../helix/model/TestClusterTopologyConfig.java     |  84 +++++++++
 .../rest/server/resources/AbstractResource.java    |   1 +
 .../server/resources/helix/ClusterAccessor.java    |  25 +++
 .../service/VirtualTopologyGroupService.java       | 197 +++++++++++++++++++++
 .../helix/rest/server/TestClusterAccessor.java     | 156 ++++++++++++----
 .../service/TestVirtualTopologyGroupService.java   | 192 ++++++++++++++++++++
 16 files changed, 993 insertions(+), 145 deletions(-)
 copy helix-core/src/main/java/org/apache/helix/cloud/constants/{CloudProvider.java => VirtualTopologyGroupConstants.java} (64%)
 create mode 100644 helix-core/src/main/java/org/apache/helix/cloud/topology/FifoVirtualGroupAssignmentAlgorithm.java
 copy helix-core/src/main/java/org/apache/helix/{common/caches/AbstractDataSnapshot.java => cloud/topology/VirtualGroupAssignmentAlgorithm.java} (59%)
 create mode 100644 helix-core/src/main/java/org/apache/helix/model/ClusterTopologyConfig.java
 create mode 100644 helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestVirtualTopologyGroupAssignment.java
 create mode 100644 helix-core/src/test/java/org/apache/helix/model/TestClusterTopologyConfig.java
 create mode 100644 helix-rest/src/main/java/org/apache/helix/rest/server/service/VirtualTopologyGroupService.java
 create mode 100644 helix-rest/src/test/java/org/apache/helix/rest/server/service/TestVirtualTopologyGroupService.java

[helix] 02/04: Introduce VirtualTopologyGroup and its assignment logic with benchmark. (#1948)

Posted by jx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 199d997a18f7ca3f911a0a8cc219dae5009d5aa2
Author: Qi (Quincy) Qu <qq...@linkedin.com>
AuthorDate: Thu Feb 3 12:18:46 2022 -0800

    Introduce VirtualTopologyGroup and its assignment logic with benchmark. (#1948)
    
    
    
    * Cleanup unused assignment schemes and minor change.
    
    * Further refactor and code cleanup.
---
 .../constants/VirtualTopologyGroupConstants.java   | 29 +++++++
 .../FifoVirtualGroupAssignmentAlgorithm.java       | 79 ++++++++++++++++++
 .../topology/VirtualGroupAssignmentAlgorithm.java  | 38 +++++++++
 .../main/java/org/apache/helix/util/HelixUtil.java | 14 ++++
 .../TestVirtualTopologyGroupAssignment.java        | 94 ++++++++++++++++++++++
 .../rebalancer/waged/model/TestAssignableNode.java | 16 ++++
 6 files changed, 270 insertions(+)

diff --git a/helix-core/src/main/java/org/apache/helix/cloud/constants/VirtualTopologyGroupConstants.java b/helix-core/src/main/java/org/apache/helix/cloud/constants/VirtualTopologyGroupConstants.java
new file mode 100644
index 0000000..a92e195
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/cloud/constants/VirtualTopologyGroupConstants.java
@@ -0,0 +1,29 @@
+package org.apache.helix.cloud.constants;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+public class VirtualTopologyGroupConstants {
+  public static final String GROUP_NAME = "virtualTopologyGroupName";
+  public static final String GROUP_NUMBER = "virtualTopologyGroupNumber";
+  public static final String GROUP_NAME_SPLITTER = "_";
+  public static final String PATH_NAME_SPLITTER = "/";
+  public static final String VIRTUAL_FAULT_ZONE_TYPE = "virtualZone";
+}
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/topology/FifoVirtualGroupAssignmentAlgorithm.java b/helix-core/src/main/java/org/apache/helix/cloud/topology/FifoVirtualGroupAssignmentAlgorithm.java
new file mode 100644
index 0000000..23da847
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/cloud/topology/FifoVirtualGroupAssignmentAlgorithm.java
@@ -0,0 +1,79 @@
+package org.apache.helix.cloud.topology;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.cloud.constants.VirtualTopologyGroupConstants;
+import org.apache.helix.util.HelixUtil;
+
+
+/**
+ * A strategy that densely assign virtual groups with input instance list, it doesn't move to the next one until
+ * the current one is filled.
+ * Given that instances.size = instancesPerGroup * numGroups + residuals,
+ * we break [residuals] into the first few groups, as a result each virtual group will have
+ * either [instancesPerGroup] or [instancesPerGroup + 1] instances.
+ */
+public class FifoVirtualGroupAssignmentAlgorithm implements VirtualGroupAssignmentAlgorithm {
+  private static final FifoVirtualGroupAssignmentAlgorithm _instance = new FifoVirtualGroupAssignmentAlgorithm();
+
+  private FifoVirtualGroupAssignmentAlgorithm() { }
+
+  public static FifoVirtualGroupAssignmentAlgorithm getInstance() {
+    return _instance;
+  }
+
+  @Override
+  public Map<String, Set<String>> computeAssignment(int numGroups, String virtualGroupName,
+      Map<String, Set<String>> zoneMapping) {
+    List<String> sortedInstances = HelixUtil.sortAndFlattenZoneMapping(zoneMapping);
+    Map<String, Set<String>> assignment = new HashMap<>();
+    // #instances = instancesPerGroupBase * numGroups + residuals
+    int instancesPerGroupBase = sortedInstances.size() / numGroups;
+    int residuals = sortedInstances.size() % numGroups; // assign across the first #residuals groups
+    List<Integer> numInstances = new ArrayList<>();
+    int instanceInd = 0;
+    for (int groupInd = 0; groupInd < numGroups; groupInd++) {
+      int num = groupInd < residuals
+          ? instancesPerGroupBase + 1
+          : instancesPerGroupBase;
+      String groupId = computeVirtualGroupId(groupInd, virtualGroupName);
+      assignment.put(groupId, new HashSet<>());
+      for (int i = 0; i < num; i++) {
+        assignment.get(groupId).add(sortedInstances.get(instanceInd));
+        instanceInd++;
+      }
+      numInstances.add(num);
+    }
+    Preconditions.checkState(numInstances.stream().mapToInt(Integer::intValue).sum() == sortedInstances.size());
+    return ImmutableMap.copyOf(assignment);
+  }
+
+  private static String computeVirtualGroupId(int groupIndex, String virtualGroupName) {
+    return virtualGroupName + VirtualTopologyGroupConstants.GROUP_NAME_SPLITTER + groupIndex;
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/cloud/topology/VirtualGroupAssignmentAlgorithm.java b/helix-core/src/main/java/org/apache/helix/cloud/topology/VirtualGroupAssignmentAlgorithm.java
new file mode 100644
index 0000000..8d6c97f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/cloud/topology/VirtualGroupAssignmentAlgorithm.java
@@ -0,0 +1,38 @@
+package org.apache.helix.cloud.topology;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import java.util.Set;
+
+
+public interface VirtualGroupAssignmentAlgorithm {
+
+  /**
+   * Compute the assignment for each virtual topology group.
+   *
+   * @param numGroups number of the virtual groups
+   * @param virtualGroupName virtual group name
+   * @param zoneMapping current zone mapping from zoneId to instanceIds
+   * @return the assignment as mapping from virtual group ID to instanceIds
+   */
+  Map<String, Set<String>> computeAssignment(int numGroups, String virtualGroupName,
+      Map<String, Set<String>> zoneMapping);
+}
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 75ca302..ee31e43 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -580,4 +580,18 @@ public final class HelixUtil {
         || (instancesMessages.getOrDefault(instance, Collections.emptyList()).stream()
         .anyMatch(Message::isParticipantStatusChangeType));
   }
+
+  /**
+   * Sort zoneMapping for each virtual group and flatten to a list.
+   * @param zoneMapping virtual group mapping.
+   * @return a list of instances sorted and flattened.
+   */
+  public static List<String> sortAndFlattenZoneMapping(Map<String, Set<String>> zoneMapping) {
+    return zoneMapping
+        .entrySet()
+        .stream()
+        .sorted(Map.Entry.comparingByKey())
+        .flatMap(entry -> entry.getValue().stream().sorted())
+        .collect(Collectors.toList());
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestVirtualTopologyGroupAssignment.java b/helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestVirtualTopologyGroupAssignment.java
new file mode 100644
index 0000000..54f4365
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/cloud/virtualTopologyGroup/TestVirtualTopologyGroupAssignment.java
@@ -0,0 +1,94 @@
+package org.apache.helix.cloud.virtualTopologyGroup;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Sets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.cloud.constants.VirtualTopologyGroupConstants;
+import org.apache.helix.cloud.topology.FifoVirtualGroupAssignmentAlgorithm;
+import org.apache.helix.cloud.topology.VirtualGroupAssignmentAlgorithm;
+import org.apache.helix.util.HelixUtil;
+import org.testng.Assert;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class TestVirtualTopologyGroupAssignment {
+
+  private static final String GROUP_NAME = "test_virtual_group";
+  private final List<String> _flattenExpected = Arrays.asList(
+      "1", "2", "3",
+      "4", "5", "6",
+      "7", "8", "9",
+      "a", "b", "c", "d");
+  private Map<String, Set<String>> _zoneMapping = new HashMap<>();
+
+  @BeforeTest
+  public void prepare() {
+    _zoneMapping = new HashMap<>();
+    _zoneMapping.put("c", Sets.newHashSet("9", "8", "7"));
+    _zoneMapping.put("a", Sets.newHashSet("2", "3", "1"));
+    _zoneMapping.put("z", Sets.newHashSet("b", "c", "d", "a"));
+    _zoneMapping.put("b", Sets.newHashSet("5", "4", "6"));
+  }
+
+  @Test
+  public void testFlattenZoneMapping() {
+    Assert.assertEquals(HelixUtil.sortAndFlattenZoneMapping(_zoneMapping), _flattenExpected);
+  }
+
+  @Test(dataProvider = "getMappingTests")
+  public void testAssignmentScheme(int numGroups, Map<String, Set<String>> expected,
+      VirtualGroupAssignmentAlgorithm algorithm) {
+    Assert.assertEquals(algorithm.computeAssignment(numGroups, GROUP_NAME, _zoneMapping), expected);
+  }
+
+  @DataProvider
+  public Object[][] getMappingTests() {
+    Map<String, Set<String>> virtualMapping = new HashMap<>();
+    VirtualGroupAssignmentAlgorithm algorithm = FifoVirtualGroupAssignmentAlgorithm.getInstance();
+    virtualMapping.put(computeVirtualGroupId(0), Sets.newHashSet("1", "2", "3", "4", "5"));
+    virtualMapping.put(computeVirtualGroupId(1), Sets.newHashSet("6", "7", "8", "9"));
+    virtualMapping.put(computeVirtualGroupId(2), Sets.newHashSet("a", "b", "c", "d"));
+    Assert.assertEquals(algorithm.computeAssignment(3, GROUP_NAME, _zoneMapping),
+        virtualMapping);
+    Map<String, Set<String>> virtualMapping2 = new HashMap<>();
+    virtualMapping2.put(computeVirtualGroupId(0), Sets.newHashSet("1", "2"));
+    virtualMapping2.put(computeVirtualGroupId(1), Sets.newHashSet("3", "4"));
+    virtualMapping2.put(computeVirtualGroupId(2), Sets.newHashSet("5", "6"));
+    virtualMapping2.put(computeVirtualGroupId(3), Sets.newHashSet("7", "8"));
+    virtualMapping2.put(computeVirtualGroupId(4), Sets.newHashSet("9", "a"));
+    virtualMapping2.put(computeVirtualGroupId(5), Sets.newHashSet("b"));
+    virtualMapping2.put(computeVirtualGroupId(6), Sets.newHashSet("c"));
+    virtualMapping2.put(computeVirtualGroupId(7), Sets.newHashSet("d"));
+    return new Object[][] {
+        {3, virtualMapping, algorithm},
+        {8, virtualMapping2, algorithm}
+    };
+  }
+
+  private static String computeVirtualGroupId(int groupIndex) {
+    return GROUP_NAME + VirtualTopologyGroupConstants.GROUP_NAME_SPLITTER + groupIndex;
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index 4570efd..9cbfc25 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -257,6 +257,22 @@ public class TestAssignableNode extends AbstractTestClusterModel {
         testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
 
     Assert.assertEquals(assignableNode.getFaultZone(), "2/testInstance");
+
+    // test fault zone not in top of topology
+    testClusterConfig = new ClusterConfig("testClusterConfigId");
+    testClusterConfig.setFaultZoneType("zone");
+    testClusterConfig.setTopologyAwareEnabled(true);
+    testClusterConfig.setTopology("/rack/zone/instance");
+
+    testInstanceConfig = new InstanceConfig("testInstanceConfigId");
+    testInstanceConfig.setDomain("rack=3, zone=2, instance=testInstanceConfigId");
+    instanceConfigMap = new HashMap<>();
+    instanceConfigMap.put(_testInstanceId, testInstanceConfig);
+    when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+    when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
+    assignableNode = new AssignableNode(testCache.getClusterConfig(),
+        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
+    Assert.assertEquals(assignableNode.getFaultZone(), "3/2");
   }
 
   @Test

[helix] 03/04: Implement java API and utils for virtual topology group (#1935)

Posted by jx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 90a3832a0f9811dac0007031ca33fbe4e0bd15ec
Author: Qi (Quincy) Qu <qq...@linkedin.com>
AuthorDate: Tue Feb 8 16:53:40 2022 -0500

    Implement java API and utils for virtual topology group (#1935)
    
    Add comment to VirtualTopologyGroupService.
---
 .../constants/VirtualTopologyGroupConstants.java   |   1 +
 .../rebalancer/waged/model/AssignableNode.java     |   1 -
 .../org/apache/helix/model/InstanceConfig.java     |  21 ++-
 .../service/VirtualTopologyGroupService.java       | 197 +++++++++++++++++++++
 .../service/TestVirtualTopologyGroupService.java   | 192 ++++++++++++++++++++
 5 files changed, 407 insertions(+), 5 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/cloud/constants/VirtualTopologyGroupConstants.java b/helix-core/src/main/java/org/apache/helix/cloud/constants/VirtualTopologyGroupConstants.java
index a92e195..d97173a 100644
--- a/helix-core/src/main/java/org/apache/helix/cloud/constants/VirtualTopologyGroupConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/cloud/constants/VirtualTopologyGroupConstants.java
@@ -23,6 +23,7 @@ package org.apache.helix.cloud.constants;
 public class VirtualTopologyGroupConstants {
   public static final String GROUP_NAME = "virtualTopologyGroupName";
   public static final String GROUP_NUMBER = "virtualTopologyGroupNumber";
+  public static final String AUTO_MAINTENANCE_MODE_DISABLED = "autoMaintenanceModeDisabled";
   public static final String GROUP_NAME_SPLITTER = "_";
   public static final String PATH_NAME_SPLITTER = "/";
   public static final String VIRTUAL_FAULT_ZONE_TYPE = "virtualZone";
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index aae2328..e29052a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -19,7 +19,6 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index 143b610..3ab3ea0 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -63,6 +63,8 @@ public class InstanceConfig extends HelixProperty {
   public static final int WEIGHT_NOT_SET = -1;
   public static final int MAX_CONCURRENT_TASK_NOT_SET = -1;
   private static final int TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
+  private static final String DOMAIN_FIELD_SPLITTER = ",";
+  private static final String DOMAIN_VALUE_JOINER = "=";
 
   private static final Logger _logger = LoggerFactory.getLogger(InstanceConfig.class.getName());
 
@@ -156,10 +158,9 @@ public class InstanceConfig extends HelixProperty {
     if (domain == null || domain.isEmpty()) {
       return domainAsMap;
     }
-
-    String[] pathPairs = domain.trim().split(",");
+    String[] pathPairs = domain.trim().split(DOMAIN_FIELD_SPLITTER);
     for (String pair : pathPairs) {
-      String[] values = pair.split("=");
+      String[] values = pair.split(DOMAIN_VALUE_JOINER);
       if (values.length != 2 || values[0].isEmpty() || values[1].isEmpty()) {
         throw new IllegalArgumentException(
             String.format("Domain-Value pair %s is not valid.", pair));
@@ -173,12 +174,24 @@ public class InstanceConfig extends HelixProperty {
   /**
    * Domain represents a hierarchy identifier for an instance.
    * Example:  "cluster=myCluster,zone=myZone1,rack=myRack,host=hostname,instance=instance001".
-   * @return
    */
   public void setDomain(String domain) {
     _record.setSimpleField(InstanceConfigProperty.DOMAIN.name(), domain);
   }
 
+  /**
+   * Set domain from its map representation.
+   * @param domainMap domain as a map
+   */
+  public void setDomain(Map<String, String> domainMap) {
+    String domain = domainMap
+        .entrySet()
+        .stream()
+        .map(entry -> entry.getKey() + DOMAIN_VALUE_JOINER + entry.getValue())
+        .collect(Collectors.joining(DOMAIN_FIELD_SPLITTER));
+    setDomain(domain);
+  }
+
   public int getWeight() {
     String w = _record.getSimpleField(InstanceConfigProperty.INSTANCE_WEIGHT.name());
     if (w != null) {
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/VirtualTopologyGroupService.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/VirtualTopologyGroupService.java
new file mode 100644
index 0000000..997b880
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/VirtualTopologyGroupService.java
@@ -0,0 +1,197 @@
+package org.apache.helix.rest.server.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.cloud.constants.VirtualTopologyGroupConstants;
+import org.apache.helix.cloud.topology.FifoVirtualGroupAssignmentAlgorithm;
+import org.apache.helix.cloud.topology.VirtualGroupAssignmentAlgorithm;
+import org.apache.helix.model.CloudConfig;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTopologyConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.rest.server.json.cluster.ClusterTopology;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.zkclient.DataUpdater;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Service for virtual topology group.
+ * It's a virtualization layer on top of physical fault domain and topology in cloud environments.
+ * The service computes the mapping from virtual group to instances based on the current cluster topology and update the
+ * information to cluster and all instances in the cluster.
+ */
+public class VirtualTopologyGroupService {
+  private static final Logger LOG = LoggerFactory.getLogger(VirtualTopologyGroupService.class);
+
+  private final HelixAdmin _helixAdmin;
+  private final ClusterService _clusterService;
+  private final ConfigAccessor _configAccessor;
+  private final HelixDataAccessor _dataAccessor;
+  private final VirtualGroupAssignmentAlgorithm _assignmentAlgorithm;
+
+  public VirtualTopologyGroupService(HelixAdmin helixAdmin, ClusterService clusterService,
+      ConfigAccessor configAccessor, HelixDataAccessor dataAccessor) {
+    _helixAdmin = helixAdmin;
+    _clusterService = clusterService;
+    _configAccessor = configAccessor;
+    _dataAccessor = dataAccessor;
+    _assignmentAlgorithm = FifoVirtualGroupAssignmentAlgorithm.getInstance();
+  }
+
+  /**
+   * Add virtual topology group for a cluster.
+   * This includes calculating the virtual group assignment for all instances in the cluster then update instance config
+   * and cluster config. We override {@link ClusterConfig.ClusterConfigProperty#TOPOLOGY} and
+   * {@link ClusterConfig.ClusterConfigProperty#FAULT_ZONE_TYPE} for cluster config, and add new field to
+   * {@link InstanceConfig.InstanceConfigProperty#DOMAIN} that contains virtual topology group information.
+   * This is only supported for cloud environments. Cluster is expected to be in maintenance mode during config change.
+   * @param clusterName the cluster name.
+   * @param customFields custom fields, {@link VirtualTopologyGroupConstants#GROUP_NAME}
+   *                     and {@link VirtualTopologyGroupConstants#GROUP_NUMBER} are required,
+   *                     {@link VirtualTopologyGroupConstants#AUTO_MAINTENANCE_MODE_DISABLED} is optional.
+   *                     -- if set ture, the cluster will NOT automatically enter/exit maintenance mode during this API call;
+   *                     -- if set false or not set, the cluster will automatically enter maintenance mode and exit after
+   *                     the call succeeds. It won't proceed if the cluster is already in maintenance mode.
+   *                     Either case, the cluster must be in maintenance mode before config change.
+   */
+  public void addVirtualTopologyGroup(String clusterName, Map<String, String> customFields) {
+    // validation
+    CloudConfig cloudConfig = _configAccessor.getCloudConfig(clusterName);
+    if (cloudConfig == null || !cloudConfig.isCloudEnabled()) {
+      throw new HelixException(
+          "Cloud is not enabled, addVirtualTopologyGroup is not allowed to run in non-cloud environment.");
+    }
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
+    Preconditions.checkState(clusterConfig.isTopologyAwareEnabled(),
+        "Topology-aware rebalance is not enabled in cluster " + clusterName);
+    String groupName = customFields.get(VirtualTopologyGroupConstants.GROUP_NAME);
+    String groupNumberStr = customFields.get(VirtualTopologyGroupConstants.GROUP_NUMBER);
+    Preconditions.checkArgument(!StringUtils.isEmpty(groupName), "virtualTopologyGroupName cannot be empty!");
+    Preconditions.checkArgument(!StringUtils.isEmpty(groupNumberStr), "virtualTopologyGroupNumber cannot be empty!");
+    int numGroups = 0;
+    try {
+      numGroups = Integer.parseInt(groupNumberStr);
+      Preconditions.checkArgument(numGroups > 0, "Number of virtual groups should be positive.");
+    } catch (NumberFormatException ex) {
+      throw new IllegalArgumentException("virtualTopologyGroupNumber " + groupNumberStr + " is not an integer.", ex);
+    }
+    LOG.info("Computing virtual topology group for cluster {} with param {}", clusterName, customFields);
+
+    // compute group assignment
+    ClusterTopology clusterTopology = _clusterService.getClusterTopology(clusterName);
+    Preconditions.checkArgument(numGroups <= clusterTopology.getAllInstances().size(),
+        "Number of virtual groups cannot be greater than the number of instances.");
+    Map<String, Set<String>> assignment =
+        _assignmentAlgorithm.computeAssignment(numGroups, groupName, clusterTopology.toZoneMapping());
+
+    boolean autoMaintenanceModeDisabled = Boolean.parseBoolean(
+        customFields.getOrDefault(VirtualTopologyGroupConstants.AUTO_MAINTENANCE_MODE_DISABLED, "false"));
+    // if auto mode is NOT disabled, let service enter maintenance mode and exit after the API succeeds.
+    if (!autoMaintenanceModeDisabled) {
+      Preconditions.checkState(!_helixAdmin.isInMaintenanceMode(clusterName),
+          "This operation is not allowed if cluster is already in maintenance mode before the API call. "
+              + "Please set autoMaintenanceModeDisabled=true if this is intended.");
+      _helixAdmin.manuallyEnableMaintenanceMode(clusterName, true,
+          "Enable maintenanceMode for virtual topology group change.", customFields);
+    }
+    Preconditions.checkState(_helixAdmin.isInMaintenanceMode(clusterName),
+        "Cluster is not in maintenance mode. This is required for virtual topology group setting. "
+            + "Please set autoMaintenanceModeDisabled=false (default) to let the cluster enter maintenance mode automatically, "
+            + "or use autoMaintenanceModeDisabled=true and control cluster maintenance mode in client side.");
+
+    updateConfigs(clusterName, clusterConfig, assignment);
+    if (!autoMaintenanceModeDisabled) {
+      _helixAdmin.manuallyEnableMaintenanceMode(clusterName, false,
+          "Disable maintenanceMode after virtual topology group change.", customFields);
+    }
+  }
+
+  private void updateConfigs(String clusterName, ClusterConfig clusterConfig, Map<String, Set<String>> assignment) {
+    List<String> zkPaths = new ArrayList<>();
+    List<DataUpdater<ZNRecord>> updaters = new ArrayList<>();
+    createInstanceConfigUpdater(clusterName, assignment).forEach((zkPath, updater) -> {
+      zkPaths.add(zkPath);
+      updaters.add(updater);
+    });
+    // update instance config
+    boolean[] results = _dataAccessor.updateChildren(zkPaths, updaters, AccessOption.EPHEMERAL);
+    for (int i = 0; i < results.length; i++) {
+      if (!results[i]) {
+        throw new HelixException("Failed to update instance config for path " + zkPaths.get(i));
+      }
+    }
+    // update cluster config
+    String virtualTopologyString = computeVirtualTopologyString(clusterConfig);
+    clusterConfig.setTopology(virtualTopologyString);
+    clusterConfig.setFaultZoneType(VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE);
+    _configAccessor.updateClusterConfig(clusterName, clusterConfig);
+    LOG.info("Successfully update instance and cluster config for {}", clusterName);
+  }
+
+  @VisibleForTesting
+  static String computeVirtualTopologyString(ClusterConfig clusterConfig) {
+    ClusterTopologyConfig clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
+    String endNodeType = clusterTopologyConfig.getEndNodeType();
+    String[] splits = new String[] {"", VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE, endNodeType};
+    return String.join(VirtualTopologyGroupConstants.PATH_NAME_SPLITTER, splits);
+  }
+
+  /**
+   * Create updater for instance config for async update.
+   * @param clusterName cluster name of the instances.
+   * @param assignment virtual group assignment.
+   * @return a map from instance zkPath to its {@link DataUpdater} to update.
+   */
+  @VisibleForTesting
+  static Map<String, DataUpdater<ZNRecord>> createInstanceConfigUpdater(
+      String clusterName, Map<String, Set<String>> assignment) {
+    Map<String, DataUpdater<ZNRecord>> updaters = new HashMap<>();
+    for (Map.Entry<String, Set<String>> entry : assignment.entrySet()) {
+      String virtualGroup = entry.getKey();
+      for (String instanceName : entry.getValue()) {
+        String path = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
+        updaters.put(path, currentData -> {
+          InstanceConfig instanceConfig = new InstanceConfig(currentData);
+          Map<String, String> domainMap = instanceConfig.getDomainAsMap();
+          domainMap.put(VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE, virtualGroup);
+          instanceConfig.setDomain(domainMap);
+          return instanceConfig.getRecord();
+        });
+      }
+    }
+    return updaters;
+  }
+}
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestVirtualTopologyGroupService.java b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestVirtualTopologyGroupService.java
new file mode 100644
index 0000000..ab1c53f
--- /dev/null
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestVirtualTopologyGroupService.java
@@ -0,0 +1,192 @@
+package org.apache.helix.rest.server.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.cloud.azure.AzureConstants;
+import org.apache.helix.cloud.constants.CloudProvider;
+import org.apache.helix.model.CloudConfig;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.rest.server.json.cluster.ClusterTopology;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.zkclient.DataUpdater;
+import org.testng.Assert;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.apache.helix.cloud.constants.VirtualTopologyGroupConstants.*;
+import static org.mockito.Mockito.*;
+
+
+public class TestVirtualTopologyGroupService {
+  private static final String TEST_CLUSTER = "Test_Cluster";
+  private static final String TEST_CLUSTER0 = "TestCluster_0";
+  private static final String TEST_CLUSTER1 = "TestCluster_1";
+
+  private final ConfigAccessor _configAccessor = mock(ConfigAccessor.class);
+  private final HelixDataAccessor _dataAccessor = mock(HelixDataAccessor.class);
+  private InstanceConfig _instanceConfig0;
+  private InstanceConfig _instanceConfig1;
+  private InstanceConfig _instanceConfig2;
+  private Map<String, DataUpdater<ZNRecord>> _updaterMap;
+  private HelixAdmin _helixAdmin;
+  private VirtualTopologyGroupService _service;
+
+  @BeforeTest
+  public void prepare() {
+    Map<String, Set<String>> assignment = new HashMap<>();
+    _instanceConfig0 = new InstanceConfig("instance_0");
+    _instanceConfig0.setDomain("helixZoneId=zone0");
+    _instanceConfig1 = new InstanceConfig("instance_1");
+    _instanceConfig1.setDomain("helixZoneId=zone0");
+    _instanceConfig2 = new InstanceConfig("instance_2");
+    _instanceConfig2.setDomain("helixZoneId=zone1");
+
+    assignment.put("virtual_group_0", ImmutableSet.of("instance_0", "instance_1"));
+    assignment.put("virtual_group_1", ImmutableSet.of("instance_2"));
+    _updaterMap = VirtualTopologyGroupService.createInstanceConfigUpdater(TEST_CLUSTER, assignment);
+
+    ClusterConfig clusterConfig = new ClusterConfig(TEST_CLUSTER0);
+    clusterConfig.setFaultZoneType(AzureConstants.AZURE_FAULT_ZONE_TYPE);
+    clusterConfig.setTopology(AzureConstants.AZURE_TOPOLOGY);
+    clusterConfig.setTopologyAwareEnabled(true);
+    when(_configAccessor.getClusterConfig(TEST_CLUSTER0)).thenReturn(clusterConfig);
+
+    CloudConfig.Builder cloudConfigBuilder = new CloudConfig.Builder();
+    cloudConfigBuilder.setCloudEnabled(true);
+    cloudConfigBuilder.setCloudProvider(CloudProvider.AZURE);
+    cloudConfigBuilder.setCloudID("TestID");
+    CloudConfig cloudConfig = cloudConfigBuilder.build();
+    when(_configAccessor.getCloudConfig(TEST_CLUSTER0)).thenReturn(cloudConfig);
+
+    _helixAdmin = mock(HelixAdmin.class);
+    when(_helixAdmin.isInMaintenanceMode(anyString())).thenReturn(true);
+
+    boolean[] results = new boolean[2];
+    results[0] = results[1] = true;
+    when(_dataAccessor.updateChildren(anyList(), anyList(), anyInt())).thenReturn(results);
+    ClusterService clusterService = mock(ClusterService.class);
+    when(clusterService.getClusterTopology(anyString())).thenReturn(prepareClusterTopology());
+    _service = new VirtualTopologyGroupService(_helixAdmin, clusterService, _configAccessor, _dataAccessor);
+  }
+
+  @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "Cloud is not enabled.*")
+  public void testClusterCloudConfigSetup() {
+    ClusterConfig clusterConfig1 = new ClusterConfig(TEST_CLUSTER1);
+    when(_configAccessor.getClusterConfig(TEST_CLUSTER1)).thenReturn(clusterConfig1);
+    _service.addVirtualTopologyGroup(TEST_CLUSTER1, ImmutableMap.of(GROUP_NAME, "test-group", GROUP_NUMBER, "2"));
+  }
+
+  @Test
+  public void testVirtualTopologyGroupService() {
+    _service.addVirtualTopologyGroup(TEST_CLUSTER0, ImmutableMap.of(
+        GROUP_NAME, "test-group", GROUP_NUMBER, "2", AUTO_MAINTENANCE_MODE_DISABLED, "true"));
+    verify(_dataAccessor, times(1)).updateChildren(anyList(), anyList(), anyInt());
+    verify(_configAccessor, times(1)).updateClusterConfig(anyString(), any());
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class,
+      expectedExceptionsMessageRegExp = "This operation is not allowed if cluster is already in maintenance mode.*")
+  public void testMaintenanceModeCheckBeforeApiCall() {
+    _service.addVirtualTopologyGroup(TEST_CLUSTER0, ImmutableMap.of(GROUP_NAME, "test-group", GROUP_NUMBER, "2"));
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class,
+  expectedExceptionsMessageRegExp = "Cluster is not in maintenance mode. This is required for virtual topology group setting. "
+      + "Please set autoMaintenanceModeDisabled=false.*")
+  public void testMaintenanceModeCheckAfter() {
+    try {
+      when(_helixAdmin.isInMaintenanceMode(anyString())).thenReturn(false);
+      _service.addVirtualTopologyGroup(TEST_CLUSTER0, ImmutableMap.of(GROUP_NAME, "test-group", GROUP_NUMBER, "2"));
+    } finally {
+      when(_helixAdmin.isInMaintenanceMode(anyString())).thenReturn(true);
+    }
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class,
+      expectedExceptionsMessageRegExp = "Number of virtual groups cannot be greater than the number of instances.*")
+  public void testNumberOfInstanceCheck() {
+    _service.addVirtualTopologyGroup(TEST_CLUSTER0, ImmutableMap.of(
+        GROUP_NAME, "test-group", GROUP_NUMBER, "10", AUTO_MAINTENANCE_MODE_DISABLED, "true"));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testParamValidation() {
+    _service.addVirtualTopologyGroup(TEST_CLUSTER0, ImmutableMap.of(GROUP_NUMBER, "2"));
+  }
+
+  @Test(dataProvider = "instanceTestProvider")
+  public void testInstanceConfigUpdater(String zkPath, InstanceConfig instanceConfig, Map<String, String> expectedDomain) {
+    ZNRecord update = _updaterMap.get(zkPath).update(instanceConfig.getRecord());
+    InstanceConfig updatedConfig = new InstanceConfig(update);
+    Assert.assertEquals(updatedConfig.getDomainAsMap(), expectedDomain);
+  }
+
+  @DataProvider
+  public Object[][] instanceTestProvider() {
+    return new Object[][] {
+        {computeZkPath("instance_0"), _instanceConfig0,
+            ImmutableMap.of("helixZoneId", "zone0", VIRTUAL_FAULT_ZONE_TYPE, "virtual_group_0")},
+        {computeZkPath("instance_1"), _instanceConfig1,
+            ImmutableMap.of("helixZoneId", "zone0", VIRTUAL_FAULT_ZONE_TYPE, "virtual_group_0")},
+        {computeZkPath("instance_2"), _instanceConfig2,
+            ImmutableMap.of("helixZoneId", "zone1", VIRTUAL_FAULT_ZONE_TYPE, "virtual_group_1")}
+    };
+  }
+
+  @Test
+  public void testVirtualTopologyString() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.setTopologyAwareEnabled(true);
+    testConfig.setTopology("/zone/instance");
+    Assert.assertEquals(VirtualTopologyGroupService.computeVirtualTopologyString(testConfig),
+        "/virtualZone/instance");
+  }
+
+  private static ClusterTopology prepareClusterTopology() {
+    List<ClusterTopology.Zone> zones = ImmutableList.of(
+        new ClusterTopology.Zone("zone0", ImmutableList.of(
+            new ClusterTopology.Instance("instance_0"), new ClusterTopology.Instance("instance_1"))),
+        new ClusterTopology.Zone("zone1", ImmutableList.of(new ClusterTopology.Instance("instance_2"))));
+    return new ClusterTopology(TEST_CLUSTER0, zones, ImmutableSet.of("instance_0", "instance_1", "instance_2"));
+  }
+
+  private static String computeZkPath(String instanceName) {
+    HelixConfigScope scope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT)
+        .forCluster(TEST_CLUSTER)
+        .forParticipant(instanceName)
+        .build();
+    return scope.getZkPath();
+  }
+}

[helix] 01/04: Fix #1946 -- Refactor and move ClusterTopologyConfig

Posted by jx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 47ee384e1e6ae7d75e6a0736b30aa1b1b7b64d96
Author: Qi (Quincy) Qu <qq...@linkedin.com>
AuthorDate: Fri Jan 28 16:14:09 2022 -0800

    Fix #1946 -- Refactor and move ClusterTopologyConfig
    
    Move ClusterTopologyConfig from nested to a standalone class in helix/model and to be used by virtual topology group logic.
---
 .../controller/rebalancer/topology/Topology.java   | 120 ++++++---------------
 .../apache/helix/model/ClusterTopologyConfig.java  | 101 +++++++++++++++++
 .../helix/model/TestClusterTopologyConfig.java     |  84 +++++++++++++++
 3 files changed, 216 insertions(+), 89 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
index 3d2a878..f35b637 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
@@ -30,6 +30,7 @@ import java.util.Set;
 
 import org.apache.helix.HelixException;
 import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTopologyConfig;
 import org.apache.helix.model.InstanceConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,15 +54,7 @@ public class Topology {
   private final List<String> _allInstances;
   private final List<String> _liveInstances;
   private final Map<String, InstanceConfig> _instanceConfigMap;
-  private final ClusterConfig _clusterConfig;
-  private static final String DEFAULT_DOMAIN_PREFIX = "Helix_default_";
-
-  static class ClusterTopologyConfig {
-    String endNodeType;
-    String faultZoneType;
-    LinkedHashMap<String, String> topologyKeyDefaultValue = new LinkedHashMap<>();
-  }
-  private ClusterTopologyConfig _clusterTopologyConfig;
+  private final ClusterTopologyConfig _clusterTopologyConfig;
 
   public Topology(final List<String> allNodes, final List<String> liveNodes,
       final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) {
@@ -78,18 +71,16 @@ public class Topology {
       throw new HelixException(String.format("Config for instances %s is not found!",
           _allInstances.removeAll(_instanceConfigMap.keySet())));
     }
-    _clusterConfig = clusterConfig;
-    _clusterTopologyConfig = getClusterTopologySetting(clusterConfig);
-
-    _root = createClusterTree();
+    _clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
+    _root = createClusterTree(clusterConfig);
   }
 
   public String getEndNodeType() {
-    return _clusterTopologyConfig.endNodeType;
+    return _clusterTopologyConfig.getEndNodeType();
   }
 
   public String getFaultZoneType() {
-    return _clusterTopologyConfig.faultZoneType;
+    return _clusterTopologyConfig.getFaultZoneType();
   }
 
   public Node getRootNode() {
@@ -158,7 +149,7 @@ public class Topology {
     return newRoot;
   }
 
-  private Node createClusterTree() {
+  private Node createClusterTree(ClusterConfig clusterConfig) {
     // root
     Node root = new Node();
     root.setName("root");
@@ -171,94 +162,46 @@ public class Topology {
       InstanceConfig insConfig = _instanceConfigMap.get(instanceName);
       try {
         LinkedHashMap<String, String> instanceTopologyMap =
-            computeInstanceTopologyMapHelper(_clusterConfig.isTopologyAwareEnabled(), instanceName,
-                insConfig, _clusterTopologyConfig.topologyKeyDefaultValue,
-                null /*faultZoneForEarlyQuit*/);
+            computeInstanceTopologyMapHelper(_clusterTopologyConfig, instanceName, insConfig, null);
         int weight = insConfig.getWeight();
         if (weight < 0 || weight == InstanceConfig.WEIGHT_NOT_SET) {
           weight = DEFAULT_NODE_WEIGHT;
         }
         addEndNode(root, instanceName, instanceTopologyMap, weight, _liveInstances);
       } catch (IllegalArgumentException e) {
-        if (isInstanceEnabled(_clusterConfig, instanceName, insConfig)) {
+        if (isInstanceEnabled(clusterConfig, instanceName, insConfig)) {
           throw e;
         } else {
-          logger
-              .warn("Topology setting {} for instance {} is unset or invalid, ignore the instance!",
-                  insConfig.getDomainAsString(), instanceName);
+          logger.warn("Topology setting {} for instance {} is unset or invalid, ignore the instance!",
+              insConfig.getDomainAsString(), instanceName);
         }
       }
     }
     return root;
   }
 
-  private boolean isInstanceEnabled(ClusterConfig clusterConfig, String instanceName,
+  private static boolean isInstanceEnabled(ClusterConfig clusterConfig, String instanceName,
       InstanceConfig instanceConfig) {
     return (instanceConfig.getInstanceEnabled() && (clusterConfig.getDisabledInstances() == null
         || !clusterConfig.getDisabledInstances().containsKey(instanceName)));
   }
 
   /**
-   * Populate faultZone, endNodetype and and a LinkedHashMap containing pathKeys default values for
-   * clusterConfig.Topology. The LinkedHashMap will be empty if clusterConfig.Topology is unset.
-   *
-   * @return an Instance of Topology.ClusterTopologyConfig.
-   */
-  private static ClusterTopologyConfig getClusterTopologySetting(ClusterConfig clusterConfig) {
-
-    ClusterTopologyConfig clusterTopologyConfig = new ClusterTopologyConfig();
-    if (clusterConfig.isTopologyAwareEnabled()) {
-      String topologyDef = clusterConfig.getTopology();
-      if (topologyDef != null) {
-        String[] topologyKeys = topologyDef.trim().split("/");
-        int lastValidTypeIdx = 0;
-        for (int i = 0; i < topologyKeys.length; i++) {
-          if (topologyKeys[i].length() != 0) {
-            clusterTopologyConfig.topologyKeyDefaultValue
-                .put(topologyKeys[i], DEFAULT_DOMAIN_PREFIX + topologyKeys[i]);
-            lastValidTypeIdx = i;
-          }
-        }
-        if (clusterTopologyConfig.topologyKeyDefaultValue.size() == 0) {
-          throw new IllegalArgumentException("Invalid cluster topology definition " + topologyDef);
-        }
-        clusterTopologyConfig.endNodeType = topologyKeys[lastValidTypeIdx];
-        String faultZoneType = clusterConfig.getFaultZoneType();
-        if (faultZoneType == null) {
-          clusterTopologyConfig.faultZoneType = clusterTopologyConfig.endNodeType;
-        } else if (!clusterTopologyConfig.topologyKeyDefaultValue.containsKey(faultZoneType)) {
-          throw new HelixException(String
-              .format("Invalid fault zone type %s, not present in topology definition %s.",
-                  faultZoneType, clusterConfig.getTopology()));
-        } else {
-          clusterTopologyConfig.faultZoneType = faultZoneType;
-        }
-      } else {
-        // Use default cluster topology definition, i,e. /root/zone/instance
-        clusterTopologyConfig.endNodeType = Types.INSTANCE.name();
-        clusterTopologyConfig.faultZoneType = Types.ZONE.name();
-      }
-    } else {
-      clusterTopologyConfig.endNodeType = Types.INSTANCE.name();
-      clusterTopologyConfig.faultZoneType = Types.INSTANCE.name();
-    }
-    return clusterTopologyConfig;
-  }
-
-  /**
-   * @param clusterTopologyKeyDefaultValue  a LinkedHashMap where keys are cluster topology path and
-   *                                       values are their corresponding default value. The entries
-   *                                        are ordered by ClusterConfig.topology setting.
-   * @param faultZoneForEarlyQuit   this flag is set to true only if caller wants the path
-   *                                to faultZone instead the whole path for the instance.
+   * Construct the instance topology map for an instance.
+   * The mapping is the cluster topology path name to its corresponding value.
+   * @param clusterTopologyConfig
+   * @param instanceName
+   * @param instanceConfig
+   * @param faultZoneForEarlyQuit  Nullable, if set to non-null value, the faultZone path will stop at the matched
+   *                               faultZone value instead of constructing the whole path for the instance.
    */
   private static LinkedHashMap<String, String> computeInstanceTopologyMapHelper(
-      boolean isTopologyAwareEnabled, String instanceName, InstanceConfig instanceConfig,
-      LinkedHashMap<String, String> clusterTopologyKeyDefaultValue, String faultZoneForEarlyQuit)
+      ClusterTopologyConfig clusterTopologyConfig, String instanceName, InstanceConfig instanceConfig,
+      String faultZoneForEarlyQuit)
       throws IllegalArgumentException {
     LinkedHashMap<String, String> instanceTopologyMap = new LinkedHashMap<>();
-    if (isTopologyAwareEnabled) {
-      if (clusterTopologyKeyDefaultValue.size() == 0) {
+    if (clusterTopologyConfig.isTopologyAwareEnabled()) {
+      if (clusterTopologyConfig.getTopologyKeyDefaultValue().isEmpty()) {
         // Return a ordered map using default cluster topology definition, i,e. /root/zone/instance
         String zone = instanceConfig.getZoneId();
         if (zone == null) {
@@ -283,11 +226,11 @@ public class Topology {
                   instanceName));
         }
         int numOfMatchedKeys = 0;
-        for (String key : clusterTopologyKeyDefaultValue.keySet()) {
+        for (String key : clusterTopologyConfig.getTopologyKeyDefaultValue().keySet()) {
           // if a key does not exist in the instance domain config, using the default domain value.
           String value = domainAsMap.get(key);
           if (value == null || value.length() == 0) {
-            value = clusterTopologyKeyDefaultValue.get(key);
+            value = clusterTopologyConfig.getTopologyKeyDefaultValue().get(key);
           } else {
             numOfMatchedKeys++;
           }
@@ -300,7 +243,7 @@ public class Topology {
           logger.warn(
               "Key-value pairs in InstanceConfig.Domain {} do not align with keys in ClusterConfig.Topology "
                   + "{}, using default domain value instead", instanceConfig.getDomainAsString(),
-              clusterTopologyKeyDefaultValue.keySet());
+              clusterTopologyConfig.getTopologyKeyDefaultValue().keySet());
         }
       }
     } else {
@@ -327,11 +270,10 @@ public class Topology {
   public static LinkedHashMap<String, String> computeInstanceTopologyMap(
       ClusterConfig clusterConfig, String instanceName, InstanceConfig instanceConfig,
       boolean earlyQuitForFaultZone) {
-    ClusterTopologyConfig clusterTopologyConfig = getClusterTopologySetting(clusterConfig);
+    ClusterTopologyConfig clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
     String faultZoneForEarlyQuit =
-        earlyQuitForFaultZone ? clusterTopologyConfig.faultZoneType : null;
-    return computeInstanceTopologyMapHelper(clusterConfig.isTopologyAwareEnabled(), instanceName,
-        instanceConfig, clusterTopologyConfig.topologyKeyDefaultValue, faultZoneForEarlyQuit);
+        earlyQuitForFaultZone ? clusterTopologyConfig.getFaultZoneType() : null;
+    return computeInstanceTopologyMapHelper(clusterTopologyConfig, instanceName, instanceConfig, faultZoneForEarlyQuit);
   }
 
   /**
@@ -349,7 +291,7 @@ public class Topology {
       if (!current.hasChild(pathValue)) {
         buildNewNode(pathValue, path, current, instanceName, instanceWeight,
             liveInstances.contains(instanceName), pathNodes);
-      } else if (path.equals(_clusterTopologyConfig.endNodeType)) {
+      } else if (path.equals(_clusterTopologyConfig.getEndNodeType())) {
         throw new HelixException(
             "Failed to add topology node because duplicate leaf nodes are not allowed. Duplicate node name: "
                 + pathValue);
@@ -366,7 +308,7 @@ public class Topology {
     n.setType(type);
     n.setParent(parent);
     // if it is leaf node, create an InstanceNode instead
-    if (type.equals(_clusterTopologyConfig.endNodeType)) {
+    if (type.equals(_clusterTopologyConfig.getEndNodeType())) {
       n = new InstanceNode(n, instanceName);
       if (isLiveInstance) {
         // node is alive
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterTopologyConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterTopologyConfig.java
new file mode 100644
index 0000000..f7ae740
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterTopologyConfig.java
@@ -0,0 +1,101 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.LinkedHashMap;
+import org.apache.helix.HelixException;
+import org.apache.helix.controller.rebalancer.topology.Topology;
+
+
+public class ClusterTopologyConfig {
+  private static final String DEFAULT_DOMAIN_PREFIX = "Helix_default_";
+  private static final String TOPOLOGY_SPLITTER = "/";
+
+  private final boolean _topologyAwareEnabled;
+  private final String _endNodeType;
+  private final String _faultZoneType;
+  private final LinkedHashMap<String, String> _topologyKeyDefaultValue;
+
+  private ClusterTopologyConfig(boolean topologyAwareEnabled, String endNodeType, String faultZoneType,
+      LinkedHashMap<String, String> topologyKeyDefaultValue) {
+    _topologyAwareEnabled = topologyAwareEnabled;
+    _endNodeType = endNodeType;
+    _faultZoneType = faultZoneType;
+    _topologyKeyDefaultValue = topologyKeyDefaultValue;
+  }
+
+  /**
+   * Populate faultZone, endNodetype and and a LinkedHashMap containing pathKeys default values for
+   * clusterConfig.Topology. The LinkedHashMap will be empty if clusterConfig.Topology is unset.
+   *
+   * @return an instance of {@link ClusterTopologyConfig}
+   */
+  public static ClusterTopologyConfig createFromClusterConfig(ClusterConfig clusterConfig) {
+    if (!clusterConfig.isTopologyAwareEnabled()) {
+      return new ClusterTopologyConfig(
+          false,
+          Topology.Types.INSTANCE.name(),
+          Topology.Types.INSTANCE.name(),
+          new LinkedHashMap<>());
+    }
+    // Assign default cluster topology definition, i,e. /root/zone/instance
+    String endNodeType = Topology.Types.INSTANCE.name();
+    String faultZoneType = Topology.Types.ZONE.name();
+    LinkedHashMap<String, String> topologyKeyDefaultValue = new LinkedHashMap<>();
+
+    String topologyDef = clusterConfig.getTopology();
+    if (topologyDef != null) {
+      for (String topologyKey : topologyDef.trim().split(TOPOLOGY_SPLITTER)) {
+        if (!topologyKey.isEmpty()) {
+          topologyKeyDefaultValue.put(topologyKey, DEFAULT_DOMAIN_PREFIX + topologyKey);
+          endNodeType = topologyKey;
+        }
+      }
+      if (topologyKeyDefaultValue.isEmpty()) {
+        throw new IllegalArgumentException("Invalid cluster topology definition " + topologyDef);
+      }
+      faultZoneType = clusterConfig.getFaultZoneType();
+      if (faultZoneType == null) {
+        faultZoneType = endNodeType;
+      } else if (!topologyKeyDefaultValue.containsKey(faultZoneType)) {
+        throw new HelixException(
+            String.format("Invalid fault zone type %s, not present in topology definition %s.",
+                faultZoneType, clusterConfig.getTopology()));
+      }
+    }
+    return new ClusterTopologyConfig(true, endNodeType, faultZoneType, topologyKeyDefaultValue);
+  }
+
+  public boolean isTopologyAwareEnabled() {
+    return _topologyAwareEnabled;
+  }
+
+  public String getEndNodeType() {
+    return _endNodeType;
+  }
+
+  public String getFaultZoneType() {
+    return _faultZoneType;
+  }
+
+  public LinkedHashMap<String, String> getTopologyKeyDefaultValue() {
+    return _topologyKeyDefaultValue;
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestClusterTopologyConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestClusterTopologyConfig.java
new file mode 100644
index 0000000..8235312
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterTopologyConfig.java
@@ -0,0 +1,84 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Iterator;
+import org.apache.helix.HelixException;
+import org.apache.helix.controller.rebalancer.topology.Topology;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestClusterTopologyConfig {
+
+  @Test
+  public void testClusterNonTopologyAware() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.setTopologyAwareEnabled(false);
+    ClusterTopologyConfig clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(testConfig);
+    Assert.assertEquals(clusterTopologyConfig.getEndNodeType(), Topology.Types.INSTANCE.name());
+    Assert.assertEquals(clusterTopologyConfig.getFaultZoneType(), Topology.Types.INSTANCE.name());
+    Assert.assertTrue(clusterTopologyConfig.getTopologyKeyDefaultValue().isEmpty());
+  }
+
+  @Test
+  public void testClusterValidTopology() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.setTopologyAwareEnabled(true);
+    testConfig.setTopology("/zone/instance");
+    // no fault zone setup
+    ClusterTopologyConfig clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(testConfig);
+    Assert.assertEquals(clusterTopologyConfig.getEndNodeType(), "instance");
+    Assert.assertEquals(clusterTopologyConfig.getFaultZoneType(), "instance");
+    Assert.assertEquals(clusterTopologyConfig.getTopologyKeyDefaultValue().size(), 2);
+    // with fault zone
+    testConfig.setFaultZoneType("zone");
+    testConfig.setTopology(" /zone/instance  ");
+    clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(testConfig);
+    Assert.assertEquals(clusterTopologyConfig.getEndNodeType(), "instance");
+    Assert.assertEquals(clusterTopologyConfig.getFaultZoneType(), "zone");
+    Assert.assertEquals(clusterTopologyConfig.getTopologyKeyDefaultValue().size(), 2);
+    String[] keys = new String[] {"zone", "instance"};
+    Iterator<String> itr = clusterTopologyConfig.getTopologyKeyDefaultValue().keySet().iterator();
+    for (String k : keys) {
+      Assert.assertEquals(k, itr.next());
+    }
+
+    testConfig.setTopology("/rack/zone/instance");
+    clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(testConfig);
+    Assert.assertEquals(clusterTopologyConfig.getEndNodeType(), "instance");
+    Assert.assertEquals(clusterTopologyConfig.getFaultZoneType(), "zone");
+    Assert.assertEquals(clusterTopologyConfig.getTopologyKeyDefaultValue().size(), 3);
+    keys = new String[] {"rack", "zone", "instance"};
+    itr = clusterTopologyConfig.getTopologyKeyDefaultValue().keySet().iterator();
+    for (String k : keys) {
+      Assert.assertEquals(k, itr.next());
+    }
+  }
+
+  @Test(expectedExceptions = HelixException.class)
+  public void testClusterInvalidTopology() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.setTopologyAwareEnabled(true);
+    testConfig.setTopology("/zone/instance");
+    testConfig.setFaultZoneType("rack");
+    ClusterTopologyConfig.createFromClusterConfig(testConfig);
+  }
+}

[helix] 04/04: Add rest endpoint for virtual topology group (#1958)

Posted by jx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit de572a6192f4186cc1cabe8ee8ceb38a7d3dc615
Author: Qi (Quincy) Qu <qq...@linkedin.com>
AuthorDate: Wed Feb 16 12:55:56 2022 -0500

    Add rest endpoint for virtual topology group (#1958)
---
 .../rest/server/resources/AbstractResource.java    |   1 +
 .../server/resources/helix/ClusterAccessor.java    |  25 ++++
 .../helix/rest/server/TestClusterAccessor.java     | 156 ++++++++++++++++-----
 3 files changed, 147 insertions(+), 35 deletions(-)

diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
index a709d6a..b7d02a7 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
@@ -59,6 +59,7 @@ public class AbstractResource {
   public enum Command {
     activate,
     addInstanceTag,
+    addVirtualTopologyGroup,
     expand,
     enable,
     disable,
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
index cf2457c..daea45f 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
@@ -70,6 +70,7 @@ import org.apache.helix.rest.common.HttpConstants;
 import org.apache.helix.rest.server.json.cluster.ClusterTopology;
 import org.apache.helix.rest.server.service.ClusterService;
 import org.apache.helix.rest.server.service.ClusterServiceImpl;
+import org.apache.helix.rest.server.service.VirtualTopologyGroupService;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -235,6 +236,21 @@ public class ClusterAccessor extends AbstractHelixResource {
         }
         break;
 
+      case addVirtualTopologyGroup:
+        try {
+          addVirtualTopologyGroup(clusterId, content);
+        } catch (JsonProcessingException ex) {
+          LOG.error("Failed to parse json string: {}", content, ex);
+          return badRequest("Invalid payload json body: " + content);
+        } catch (IllegalArgumentException ex) {
+          LOG.error("Illegal input {} for command {}.", content, command, ex);
+          return badRequest(String.format("Illegal input %s for command %s", content, command));
+        } catch (Exception ex) {
+          LOG.error("Failed to add virtual topology group to cluster {}", clusterId, ex);
+          return serverError(ex);
+        }
+        break;
+
       case expand:
         try {
           clusterSetup.expandCluster(clusterId);
@@ -305,6 +321,15 @@ public class ClusterAccessor extends AbstractHelixResource {
     return OK();
   }
 
+  private void addVirtualTopologyGroup(String clusterId, String content) throws JsonProcessingException {
+    ClusterService clusterService = new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
+    VirtualTopologyGroupService service = new VirtualTopologyGroupService(
+        getHelixAdmin(), clusterService, getConfigAccessor(), getDataAccssor(clusterId));
+    Map<String, String> customFieldsMap =
+        OBJECT_MAPPER.readValue(content, new TypeReference<HashMap<String, String>>() { });
+    service.addVirtualTopologyGroup(clusterId, customFieldsMap);
+  }
+
   @ResponseMetered(name = HttpConstants.READ_REQUEST)
   @Timed(name = HttpConstants.READ_REQUEST)
   @GET
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
index 3bb3c29..34cfbaf 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
@@ -19,6 +19,7 @@ package org.apache.helix.rest.server;
  * under the License.
  */
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -45,6 +46,7 @@ import org.apache.helix.api.status.ClusterManagementMode;
 import org.apache.helix.api.status.ClusterManagementModeRequest;
 import org.apache.helix.cloud.azure.AzureConstants;
 import org.apache.helix.cloud.constants.CloudProvider;
+import org.apache.helix.cloud.constants.VirtualTopologyGroupConstants;
 import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
 import org.apache.helix.integration.manager.ClusterDistributedController;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -70,10 +72,13 @@ import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 public class TestClusterAccessor extends AbstractTestClass {
 
+  private static final String VG_CLUSTER = "vgCluster";
+
   @BeforeClass
   public void beforeClass() {
     for (String cluster : _clusters) {
@@ -167,10 +172,7 @@ public class TestClusterAccessor extends AbstractTestClass {
     updateClusterConfigFromRest(cluster, configDelta, Command.update);
 
     //get valid cluster topology map
-    String topologyMapDef = get(topologyMapUrlBase, null, Response.Status.OK.getStatusCode(), true);
-    Map<String, Object> topologyMap =
-        OBJECT_MAPPER.readValue(topologyMapDef, new TypeReference<HashMap<String, Object>>() {
-        });
+    Map<String, Object> topologyMap = getMapResponseFromRest(topologyMapUrlBase);
     Assert.assertEquals(topologyMap.size(), 2);
     Assert.assertTrue(topologyMap.get("/helixZoneId:zone0") instanceof List);
     List<String> instances = (List<String>) topologyMap.get("/helixZoneId:zone0");
@@ -197,10 +199,7 @@ public class TestClusterAccessor extends AbstractTestClass {
     updateClusterConfigFromRest(cluster, configDelta, Command.update);
 
     //get valid cluster fault zone map
-    String faultZoneMapDef = get(faultZoneUrlBase, null, Response.Status.OK.getStatusCode(), true);
-    Map<String, Object> faultZoneMap =
-        OBJECT_MAPPER.readValue(faultZoneMapDef, new TypeReference<HashMap<String, Object>>() {
-        });
+    Map<String, Object> faultZoneMap = getMapResponseFromRest(faultZoneUrlBase);
     Assert.assertEquals(faultZoneMap.size(), 2);
     Assert.assertTrue(faultZoneMap.get("/helixZoneId:zone0") instanceof List);
     instances = (List<String>) faultZoneMap.get("/helixZoneId:zone0");
@@ -223,6 +222,108 @@ public class TestClusterAccessor extends AbstractTestClass {
             "/instance:TestCluster_1localhost_12927"))));
   }
 
+  @Test(dataProvider = "prepareVirtualTopologyTests", dependsOnMethods = "testGetClusters")
+  public void testAddVirtualTopologyGroup(String requestParam, int numGroups,
+      Map<String, String> instanceToGroup) throws IOException {
+    post("clusters/" + VG_CLUSTER,
+        ImmutableMap.of("command", "addVirtualTopologyGroup"),
+        Entity.entity(requestParam, MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.OK.getStatusCode());
+    Map<String, Object> topology = getMapResponseFromRest(String.format("clusters/%s/topology", VG_CLUSTER));
+    Assert.assertTrue(topology.containsKey("zones"));
+    Assert.assertEquals(((List) topology.get("zones")).size(), numGroups);
+
+    ClusterConfig clusterConfig = getClusterConfigFromRest(VG_CLUSTER);
+    String expectedTopology = "/" + VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE + "/hostname";
+    Assert.assertEquals(clusterConfig.getTopology(), expectedTopology);
+    Assert.assertEquals(clusterConfig.getFaultZoneType(), VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE);
+
+    HelixDataAccessor helixDataAccessor = new ZKHelixDataAccessor(VG_CLUSTER, _baseAccessor);
+    for (Map.Entry<String, String> entry : instanceToGroup.entrySet()) {
+      InstanceConfig instanceConfig =
+          helixDataAccessor.getProperty(helixDataAccessor.keyBuilder().instanceConfig(entry.getKey()));
+      String expectedGroup = entry.getValue();
+      Assert.assertEquals(instanceConfig.getDomainAsMap().get(VirtualTopologyGroupConstants.VIRTUAL_FAULT_ZONE_TYPE),
+          expectedGroup);
+    }
+  }
+
+  @Test(dependsOnMethods = "testGetClusters")
+  public void testVirtualTopologyGroupMaintenanceMode() throws JsonProcessingException {
+    setupClusterForVirtualTopology(VG_CLUSTER);
+    String requestParam = "{\"virtualTopologyGroupNumber\":\"7\",\"virtualTopologyGroupName\":\"vgTest\","
+        + "\"autoMaintenanceModeDisabled\":\"true\"}";
+    // expect failure as cluster is not in maintenance mode while autoMaintenanceModeDisabled=true
+    post("clusters/" + VG_CLUSTER,
+        ImmutableMap.of("command", "addVirtualTopologyGroup"),
+        Entity.entity(requestParam, MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
+    // enable maintenance mode and expect success
+    post("clusters/" + VG_CLUSTER,
+        ImmutableMap.of("command", "enableMaintenanceMode"),
+        Entity.entity("virtual group", MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.OK.getStatusCode());
+    post("clusters/" + VG_CLUSTER,
+        ImmutableMap.of("command", "addVirtualTopologyGroup"),
+        Entity.entity(requestParam, MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.OK.getStatusCode());
+
+    Assert.assertTrue(isMaintenanceModeEnabled(VG_CLUSTER));
+  }
+
+  private boolean isMaintenanceModeEnabled(String clusterName) throws JsonProcessingException {
+    String body =
+        get("clusters/" + clusterName + "/maintenance", null, Response.Status.OK.getStatusCode(), true);
+    return OBJECT_MAPPER.readTree(body).get(ClusterAccessor.ClusterProperties.maintenance.name()).booleanValue();
+  }
+
+  @DataProvider
+  public Object[][] prepareVirtualTopologyTests() {
+    setupClusterForVirtualTopology(VG_CLUSTER);
+    String test1 = "{\"virtualTopologyGroupNumber\":\"7\",\"virtualTopologyGroupName\":\"vgTest\"}";
+    String test2 = "{\"virtualTopologyGroupNumber\":\"9\",\"virtualTopologyGroupName\":\"vgTest\"}";
+    return new Object[][] {
+        {test1, 7, ImmutableMap.of(
+            "vgCluster_localhost_12918", "vgTest_0",
+            "vgCluster_localhost_12919", "vgTest_0",
+            "vgCluster_localhost_12925", "vgTest_4",
+            "vgCluster_localhost_12927", "vgTest_6")},
+        {test2, 9, ImmutableMap.of(
+            "vgCluster_localhost_12918", "vgTest_0",
+            "vgCluster_localhost_12919", "vgTest_0",
+            "vgCluster_localhost_12925", "vgTest_6",
+            "vgCluster_localhost_12927", "vgTest_8")},
+        // repeat test1 for deterministic and test for decreasing numGroups
+        {test1, 7, ImmutableMap.of(
+            "vgCluster_localhost_12918", "vgTest_0",
+            "vgCluster_localhost_12919", "vgTest_0",
+            "vgCluster_localhost_12925", "vgTest_4",
+            "vgCluster_localhost_12927", "vgTest_6")}
+    };
+  }
+
+  private void setupClusterForVirtualTopology(String clusterName) {
+    HelixDataAccessor helixDataAccessor = new ZKHelixDataAccessor(clusterName, _baseAccessor);
+    ZNRecord record = new ZNRecord("testZnode");
+    record.setBooleanField(CloudConfig.CloudConfigProperty.CLOUD_ENABLED.name(), true);
+    record.setSimpleField(CloudConfig.CloudConfigProperty.CLOUD_ID.name(), "TestCloudID");
+    record.setSimpleField(CloudConfig.CloudConfigProperty.CLOUD_PROVIDER.name(), CloudProvider.AZURE.name());
+    CloudConfig cloudConfig = new CloudConfig.Builder(record).build();
+    _gSetupTool.addCluster(clusterName, true, cloudConfig);
+
+    Set<String> instances = new HashSet<>();
+    for (int i = 0; i < 10; i++) {
+      String instanceName = clusterName + "_localhost_" + (12918 + i);
+      _gSetupTool.addInstanceToCluster(clusterName, instanceName);
+      InstanceConfig instanceConfig =
+          helixDataAccessor.getProperty(helixDataAccessor.keyBuilder().instanceConfig(instanceName));
+      instanceConfig.setDomain("faultDomain=" + i / 2 + ",hostname=" + instanceName);
+      helixDataAccessor.setProperty(helixDataAccessor.keyBuilder().instanceConfig(instanceName), instanceConfig);
+      instances.add(instanceName);
+    }
+    startInstances(clusterName, instances, 10);
+  }
+
   @Test(dependsOnMethods = "testGetClusterTopologyAndFaultZoneMap")
   public void testAddConfigFields() throws IOException {
     System.out.println("Start test :" + TestHelper.getTestMethodName());
@@ -399,19 +500,11 @@ public class TestClusterAccessor extends AbstractTestClass {
         Entity.entity(reason, MediaType.APPLICATION_JSON_TYPE), Response.Status.OK.getStatusCode());
 
     // verify is in maintenance mode
-    String body =
-        get("clusters/" + cluster + "/maintenance", null, Response.Status.OK.getStatusCode(), true);
-    JsonNode node = OBJECT_MAPPER.readTree(body);
-    boolean maintenance =
-        node.get(ClusterAccessor.ClusterProperties.maintenance.name()).booleanValue();
-    Assert.assertTrue(maintenance);
+    Assert.assertTrue(isMaintenanceModeEnabled(cluster));
 
     // Check that we could retrieve maintenance signal correctly
-    String signal = get("clusters/" + cluster + "/controller/maintenanceSignal", null,
-        Response.Status.OK.getStatusCode(), true);
     Map<String, Object> maintenanceSignalMap =
-        OBJECT_MAPPER.readValue(signal, new TypeReference<HashMap<String, Object>>() {
-        });
+        getMapResponseFromRest("clusters/" + cluster + "/controller/maintenanceSignal");
     Assert.assertEquals(maintenanceSignalMap.get("TRIGGERED_BY"), "USER");
     Assert.assertEquals(maintenanceSignalMap.get("REASON"), reason);
     Assert.assertNotNull(maintenanceSignalMap.get("TIMESTAMP"));
@@ -422,10 +515,7 @@ public class TestClusterAccessor extends AbstractTestClass {
         Entity.entity("", MediaType.APPLICATION_JSON_TYPE), Response.Status.OK.getStatusCode());
 
     // verify no longer in maintenance mode
-    body = get("clusters/" + cluster + "/maintenance", null, Response.Status.OK.getStatusCode(), true);
-    node = OBJECT_MAPPER.readTree(body);
-    Assert.assertFalse(
-        node.get(ClusterAccessor.ClusterProperties.maintenance.name()).booleanValue());
+    Assert.assertFalse(isMaintenanceModeEnabled(cluster));
 
     get("clusters/" + cluster + "/controller/maintenanceSignal", null,
         Response.Status.NOT_FOUND.getStatusCode(), false);
@@ -448,11 +538,8 @@ public class TestClusterAccessor extends AbstractTestClass {
     Assert.assertNotNull(leader, "Leader name cannot be null!");
 
     // Get the controller leadership history JSON's last entry
-    String leadershipHistory = get("clusters/" + cluster + "/controller/history", null,
-        Response.Status.OK.getStatusCode(), true);
-    Map<String, Object> leadershipHistoryMap =
-        OBJECT_MAPPER.readValue(leadershipHistory, new TypeReference<HashMap<String, Object>>() {
-        });
+    Map<String, Object> leadershipHistoryMap = getMapResponseFromRest("clusters/" + cluster + "/controller/history");
+
     Assert.assertNotNull(leadershipHistoryMap, "Leadership history cannot be null!");
     Object leadershipHistoryList =
         leadershipHistoryMap.get(AbstractResource.Properties.history.name());
@@ -477,11 +564,8 @@ public class TestClusterAccessor extends AbstractTestClass {
         Entity.entity(reason, MediaType.APPLICATION_JSON_TYPE), Response.Status.OK.getStatusCode());
 
     // Get the maintenance history JSON's last entry
-    String maintenanceHistory = get("clusters/" + cluster + "/controller/maintenanceHistory", null,
-        Response.Status.OK.getStatusCode(), true);
     Map<String, Object> maintenanceHistoryMap =
-        OBJECT_MAPPER.readValue(maintenanceHistory, new TypeReference<HashMap<String, Object>>() {
-        });
+        getMapResponseFromRest("clusters/" + cluster + "/controller/maintenanceHistory");
     Object maintenanceHistoryList =
         maintenanceHistoryMap.get(ClusterAccessor.ClusterProperties.maintenanceHistory.name());
     Assert.assertNotNull(maintenanceHistoryList);
@@ -571,10 +655,7 @@ public class TestClusterAccessor extends AbstractTestClass {
     System.out.println("Start test :" + TestHelper.getTestMethodName());
     String cluster = "TestCluster_1";
     String urlBase = "clusters/TestCluster_1/statemodeldefs/";
-    String stateModelDefs =
-        get(urlBase, null, Response.Status.OK.getStatusCode(), true);
-    Map<String, Object> defMap = OBJECT_MAPPER.readValue(stateModelDefs, new TypeReference<HashMap<String, Object>>() {
-    });
+    Map<String, Object> defMap = getMapResponseFromRest(urlBase);
 
     Assert.assertTrue(defMap.size() == 2);
     Assert.assertTrue(defMap.get("stateModelDefinitions") instanceof List);
@@ -1427,4 +1508,9 @@ public class TestClusterAccessor extends AbstractTestClass {
     Assert.assertEquals(auditLog.getResponseCode(), statusCode);
     Assert.assertEquals(auditLog.getResponseEntity(), responseEntity);
   }
+
+  private Map<String, Object> getMapResponseFromRest(String uri) throws JsonProcessingException {
+    String response = get(uri, null, Response.Status.OK.getStatusCode(), true);
+    return OBJECT_MAPPER.readValue(response, new TypeReference<HashMap<String, Object>>() { });
+  }
 }