You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ne...@apache.org on 2022/01/29 00:23:32 UTC

[helix] branch helix-virtual-group updated: Fix #1946 -- Refactor and move ClusterTopologyConfig

This is an automated email from the ASF dual-hosted git repository.

nealsun pushed a commit to branch helix-virtual-group
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/helix-virtual-group by this push:
     new 3ad69ec  Fix #1946 -- Refactor and move ClusterTopologyConfig
3ad69ec is described below

commit 3ad69ec7e133d08a10bf65a5b75618e856cea6a1
Author: Qi (Quincy) Qu <qq...@linkedin.com>
AuthorDate: Fri Jan 28 16:14:09 2022 -0800

    Fix #1946 -- Refactor and move ClusterTopologyConfig
    
    Move ClusterTopologyConfig from nested to a standalone class in helix/model and to be used by virtual topology group logic.
---
 .../controller/rebalancer/topology/Topology.java   | 120 ++++++---------------
 .../apache/helix/model/ClusterTopologyConfig.java  | 101 +++++++++++++++++
 .../helix/model/TestClusterTopologyConfig.java     |  84 +++++++++++++++
 3 files changed, 216 insertions(+), 89 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
index 3d2a878..f35b637 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
@@ -30,6 +30,7 @@ import java.util.Set;
 
 import org.apache.helix.HelixException;
 import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTopologyConfig;
 import org.apache.helix.model.InstanceConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,15 +54,7 @@ public class Topology {
   private final List<String> _allInstances;
   private final List<String> _liveInstances;
   private final Map<String, InstanceConfig> _instanceConfigMap;
-  private final ClusterConfig _clusterConfig;
-  private static final String DEFAULT_DOMAIN_PREFIX = "Helix_default_";
-
-  static class ClusterTopologyConfig {
-    String endNodeType;
-    String faultZoneType;
-    LinkedHashMap<String, String> topologyKeyDefaultValue = new LinkedHashMap<>();
-  }
-  private ClusterTopologyConfig _clusterTopologyConfig;
+  private final ClusterTopologyConfig _clusterTopologyConfig;
 
   public Topology(final List<String> allNodes, final List<String> liveNodes,
       final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) {
@@ -78,18 +71,16 @@ public class Topology {
       throw new HelixException(String.format("Config for instances %s is not found!",
           _allInstances.removeAll(_instanceConfigMap.keySet())));
     }
-    _clusterConfig = clusterConfig;
-    _clusterTopologyConfig = getClusterTopologySetting(clusterConfig);
-
-    _root = createClusterTree();
+    _clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
+    _root = createClusterTree(clusterConfig);
   }
 
   public String getEndNodeType() {
-    return _clusterTopologyConfig.endNodeType;
+    return _clusterTopologyConfig.getEndNodeType();
   }
 
   public String getFaultZoneType() {
-    return _clusterTopologyConfig.faultZoneType;
+    return _clusterTopologyConfig.getFaultZoneType();
   }
 
   public Node getRootNode() {
@@ -158,7 +149,7 @@ public class Topology {
     return newRoot;
   }
 
-  private Node createClusterTree() {
+  private Node createClusterTree(ClusterConfig clusterConfig) {
     // root
     Node root = new Node();
     root.setName("root");
@@ -171,94 +162,46 @@ public class Topology {
       InstanceConfig insConfig = _instanceConfigMap.get(instanceName);
       try {
         LinkedHashMap<String, String> instanceTopologyMap =
-            computeInstanceTopologyMapHelper(_clusterConfig.isTopologyAwareEnabled(), instanceName,
-                insConfig, _clusterTopologyConfig.topologyKeyDefaultValue,
-                null /*faultZoneForEarlyQuit*/);
+            computeInstanceTopologyMapHelper(_clusterTopologyConfig, instanceName, insConfig, null);
         int weight = insConfig.getWeight();
         if (weight < 0 || weight == InstanceConfig.WEIGHT_NOT_SET) {
           weight = DEFAULT_NODE_WEIGHT;
         }
         addEndNode(root, instanceName, instanceTopologyMap, weight, _liveInstances);
       } catch (IllegalArgumentException e) {
-        if (isInstanceEnabled(_clusterConfig, instanceName, insConfig)) {
+        if (isInstanceEnabled(clusterConfig, instanceName, insConfig)) {
           throw e;
         } else {
-          logger
-              .warn("Topology setting {} for instance {} is unset or invalid, ignore the instance!",
-                  insConfig.getDomainAsString(), instanceName);
+          logger.warn("Topology setting {} for instance {} is unset or invalid, ignore the instance!",
+              insConfig.getDomainAsString(), instanceName);
         }
       }
     }
     return root;
   }
 
-  private boolean isInstanceEnabled(ClusterConfig clusterConfig, String instanceName,
+  private static boolean isInstanceEnabled(ClusterConfig clusterConfig, String instanceName,
       InstanceConfig instanceConfig) {
     return (instanceConfig.getInstanceEnabled() && (clusterConfig.getDisabledInstances() == null
         || !clusterConfig.getDisabledInstances().containsKey(instanceName)));
   }
 
   /**
-   * Populate faultZone, endNodetype and and a LinkedHashMap containing pathKeys default values for
-   * clusterConfig.Topology. The LinkedHashMap will be empty if clusterConfig.Topology is unset.
-   *
-   * @return an Instance of Topology.ClusterTopologyConfig.
-   */
-  private static ClusterTopologyConfig getClusterTopologySetting(ClusterConfig clusterConfig) {
-
-    ClusterTopologyConfig clusterTopologyConfig = new ClusterTopologyConfig();
-    if (clusterConfig.isTopologyAwareEnabled()) {
-      String topologyDef = clusterConfig.getTopology();
-      if (topologyDef != null) {
-        String[] topologyKeys = topologyDef.trim().split("/");
-        int lastValidTypeIdx = 0;
-        for (int i = 0; i < topologyKeys.length; i++) {
-          if (topologyKeys[i].length() != 0) {
-            clusterTopologyConfig.topologyKeyDefaultValue
-                .put(topologyKeys[i], DEFAULT_DOMAIN_PREFIX + topologyKeys[i]);
-            lastValidTypeIdx = i;
-          }
-        }
-        if (clusterTopologyConfig.topologyKeyDefaultValue.size() == 0) {
-          throw new IllegalArgumentException("Invalid cluster topology definition " + topologyDef);
-        }
-        clusterTopologyConfig.endNodeType = topologyKeys[lastValidTypeIdx];
-        String faultZoneType = clusterConfig.getFaultZoneType();
-        if (faultZoneType == null) {
-          clusterTopologyConfig.faultZoneType = clusterTopologyConfig.endNodeType;
-        } else if (!clusterTopologyConfig.topologyKeyDefaultValue.containsKey(faultZoneType)) {
-          throw new HelixException(String
-              .format("Invalid fault zone type %s, not present in topology definition %s.",
-                  faultZoneType, clusterConfig.getTopology()));
-        } else {
-          clusterTopologyConfig.faultZoneType = faultZoneType;
-        }
-      } else {
-        // Use default cluster topology definition, i,e. /root/zone/instance
-        clusterTopologyConfig.endNodeType = Types.INSTANCE.name();
-        clusterTopologyConfig.faultZoneType = Types.ZONE.name();
-      }
-    } else {
-      clusterTopologyConfig.endNodeType = Types.INSTANCE.name();
-      clusterTopologyConfig.faultZoneType = Types.INSTANCE.name();
-    }
-    return clusterTopologyConfig;
-  }
-
-  /**
-   * @param clusterTopologyKeyDefaultValue  a LinkedHashMap where keys are cluster topology path and
-   *                                       values are their corresponding default value. The entries
-   *                                        are ordered by ClusterConfig.topology setting.
-   * @param faultZoneForEarlyQuit   this flag is set to true only if caller wants the path
-   *                                to faultZone instead the whole path for the instance.
+   * Construct the instance topology map for an instance.
+   * The mapping is the cluster topology path name to its corresponding value.
+   * @param clusterTopologyConfig
+   * @param instanceName
+   * @param instanceConfig
+   * @param faultZoneForEarlyQuit  Nullable, if set to non-null value, the faultZone path will stop at the matched
+   *                               faultZone value instead of constructing the whole path for the instance.
    */
   private static LinkedHashMap<String, String> computeInstanceTopologyMapHelper(
-      boolean isTopologyAwareEnabled, String instanceName, InstanceConfig instanceConfig,
-      LinkedHashMap<String, String> clusterTopologyKeyDefaultValue, String faultZoneForEarlyQuit)
+      ClusterTopologyConfig clusterTopologyConfig, String instanceName, InstanceConfig instanceConfig,
+      String faultZoneForEarlyQuit)
       throws IllegalArgumentException {
     LinkedHashMap<String, String> instanceTopologyMap = new LinkedHashMap<>();
-    if (isTopologyAwareEnabled) {
-      if (clusterTopologyKeyDefaultValue.size() == 0) {
+    if (clusterTopologyConfig.isTopologyAwareEnabled()) {
+      if (clusterTopologyConfig.getTopologyKeyDefaultValue().isEmpty()) {
         // Return a ordered map using default cluster topology definition, i,e. /root/zone/instance
         String zone = instanceConfig.getZoneId();
         if (zone == null) {
@@ -283,11 +226,11 @@ public class Topology {
                   instanceName));
         }
         int numOfMatchedKeys = 0;
-        for (String key : clusterTopologyKeyDefaultValue.keySet()) {
+        for (String key : clusterTopologyConfig.getTopologyKeyDefaultValue().keySet()) {
           // if a key does not exist in the instance domain config, using the default domain value.
           String value = domainAsMap.get(key);
           if (value == null || value.length() == 0) {
-            value = clusterTopologyKeyDefaultValue.get(key);
+            value = clusterTopologyConfig.getTopologyKeyDefaultValue().get(key);
           } else {
             numOfMatchedKeys++;
           }
@@ -300,7 +243,7 @@ public class Topology {
           logger.warn(
               "Key-value pairs in InstanceConfig.Domain {} do not align with keys in ClusterConfig.Topology "
                   + "{}, using default domain value instead", instanceConfig.getDomainAsString(),
-              clusterTopologyKeyDefaultValue.keySet());
+              clusterTopologyConfig.getTopologyKeyDefaultValue().keySet());
         }
       }
     } else {
@@ -327,11 +270,10 @@ public class Topology {
   public static LinkedHashMap<String, String> computeInstanceTopologyMap(
       ClusterConfig clusterConfig, String instanceName, InstanceConfig instanceConfig,
       boolean earlyQuitForFaultZone) {
-    ClusterTopologyConfig clusterTopologyConfig = getClusterTopologySetting(clusterConfig);
+    ClusterTopologyConfig clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
     String faultZoneForEarlyQuit =
-        earlyQuitForFaultZone ? clusterTopologyConfig.faultZoneType : null;
-    return computeInstanceTopologyMapHelper(clusterConfig.isTopologyAwareEnabled(), instanceName,
-        instanceConfig, clusterTopologyConfig.topologyKeyDefaultValue, faultZoneForEarlyQuit);
+        earlyQuitForFaultZone ? clusterTopologyConfig.getFaultZoneType() : null;
+    return computeInstanceTopologyMapHelper(clusterTopologyConfig, instanceName, instanceConfig, faultZoneForEarlyQuit);
   }
 
   /**
@@ -349,7 +291,7 @@ public class Topology {
       if (!current.hasChild(pathValue)) {
         buildNewNode(pathValue, path, current, instanceName, instanceWeight,
             liveInstances.contains(instanceName), pathNodes);
-      } else if (path.equals(_clusterTopologyConfig.endNodeType)) {
+      } else if (path.equals(_clusterTopologyConfig.getEndNodeType())) {
         throw new HelixException(
             "Failed to add topology node because duplicate leaf nodes are not allowed. Duplicate node name: "
                 + pathValue);
@@ -366,7 +308,7 @@ public class Topology {
     n.setType(type);
     n.setParent(parent);
     // if it is leaf node, create an InstanceNode instead
-    if (type.equals(_clusterTopologyConfig.endNodeType)) {
+    if (type.equals(_clusterTopologyConfig.getEndNodeType())) {
       n = new InstanceNode(n, instanceName);
       if (isLiveInstance) {
         // node is alive
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterTopologyConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterTopologyConfig.java
new file mode 100644
index 0000000..f7ae740
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterTopologyConfig.java
@@ -0,0 +1,101 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.LinkedHashMap;
+import org.apache.helix.HelixException;
+import org.apache.helix.controller.rebalancer.topology.Topology;
+
+
+public class ClusterTopologyConfig {
+  private static final String DEFAULT_DOMAIN_PREFIX = "Helix_default_";
+  private static final String TOPOLOGY_SPLITTER = "/";
+
+  private final boolean _topologyAwareEnabled;
+  private final String _endNodeType;
+  private final String _faultZoneType;
+  private final LinkedHashMap<String, String> _topologyKeyDefaultValue;
+
+  private ClusterTopologyConfig(boolean topologyAwareEnabled, String endNodeType, String faultZoneType,
+      LinkedHashMap<String, String> topologyKeyDefaultValue) {
+    _topologyAwareEnabled = topologyAwareEnabled;
+    _endNodeType = endNodeType;
+    _faultZoneType = faultZoneType;
+    _topologyKeyDefaultValue = topologyKeyDefaultValue;
+  }
+
+  /**
+   * Populate faultZone, endNodetype and and a LinkedHashMap containing pathKeys default values for
+   * clusterConfig.Topology. The LinkedHashMap will be empty if clusterConfig.Topology is unset.
+   *
+   * @return an instance of {@link ClusterTopologyConfig}
+   */
+  public static ClusterTopologyConfig createFromClusterConfig(ClusterConfig clusterConfig) {
+    if (!clusterConfig.isTopologyAwareEnabled()) {
+      return new ClusterTopologyConfig(
+          false,
+          Topology.Types.INSTANCE.name(),
+          Topology.Types.INSTANCE.name(),
+          new LinkedHashMap<>());
+    }
+    // Assign default cluster topology definition, i,e. /root/zone/instance
+    String endNodeType = Topology.Types.INSTANCE.name();
+    String faultZoneType = Topology.Types.ZONE.name();
+    LinkedHashMap<String, String> topologyKeyDefaultValue = new LinkedHashMap<>();
+
+    String topologyDef = clusterConfig.getTopology();
+    if (topologyDef != null) {
+      for (String topologyKey : topologyDef.trim().split(TOPOLOGY_SPLITTER)) {
+        if (!topologyKey.isEmpty()) {
+          topologyKeyDefaultValue.put(topologyKey, DEFAULT_DOMAIN_PREFIX + topologyKey);
+          endNodeType = topologyKey;
+        }
+      }
+      if (topologyKeyDefaultValue.isEmpty()) {
+        throw new IllegalArgumentException("Invalid cluster topology definition " + topologyDef);
+      }
+      faultZoneType = clusterConfig.getFaultZoneType();
+      if (faultZoneType == null) {
+        faultZoneType = endNodeType;
+      } else if (!topologyKeyDefaultValue.containsKey(faultZoneType)) {
+        throw new HelixException(
+            String.format("Invalid fault zone type %s, not present in topology definition %s.",
+                faultZoneType, clusterConfig.getTopology()));
+      }
+    }
+    return new ClusterTopologyConfig(true, endNodeType, faultZoneType, topologyKeyDefaultValue);
+  }
+
+  public boolean isTopologyAwareEnabled() {
+    return _topologyAwareEnabled;
+  }
+
+  public String getEndNodeType() {
+    return _endNodeType;
+  }
+
+  public String getFaultZoneType() {
+    return _faultZoneType;
+  }
+
+  public LinkedHashMap<String, String> getTopologyKeyDefaultValue() {
+    return _topologyKeyDefaultValue;
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestClusterTopologyConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestClusterTopologyConfig.java
new file mode 100644
index 0000000..8235312
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterTopologyConfig.java
@@ -0,0 +1,84 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Iterator;
+import org.apache.helix.HelixException;
+import org.apache.helix.controller.rebalancer.topology.Topology;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestClusterTopologyConfig {
+
+  @Test
+  public void testClusterNonTopologyAware() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.setTopologyAwareEnabled(false);
+    ClusterTopologyConfig clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(testConfig);
+    Assert.assertEquals(clusterTopologyConfig.getEndNodeType(), Topology.Types.INSTANCE.name());
+    Assert.assertEquals(clusterTopologyConfig.getFaultZoneType(), Topology.Types.INSTANCE.name());
+    Assert.assertTrue(clusterTopologyConfig.getTopologyKeyDefaultValue().isEmpty());
+  }
+
+  @Test
+  public void testClusterValidTopology() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.setTopologyAwareEnabled(true);
+    testConfig.setTopology("/zone/instance");
+    // no fault zone setup
+    ClusterTopologyConfig clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(testConfig);
+    Assert.assertEquals(clusterTopologyConfig.getEndNodeType(), "instance");
+    Assert.assertEquals(clusterTopologyConfig.getFaultZoneType(), "instance");
+    Assert.assertEquals(clusterTopologyConfig.getTopologyKeyDefaultValue().size(), 2);
+    // with fault zone
+    testConfig.setFaultZoneType("zone");
+    testConfig.setTopology(" /zone/instance  ");
+    clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(testConfig);
+    Assert.assertEquals(clusterTopologyConfig.getEndNodeType(), "instance");
+    Assert.assertEquals(clusterTopologyConfig.getFaultZoneType(), "zone");
+    Assert.assertEquals(clusterTopologyConfig.getTopologyKeyDefaultValue().size(), 2);
+    String[] keys = new String[] {"zone", "instance"};
+    Iterator<String> itr = clusterTopologyConfig.getTopologyKeyDefaultValue().keySet().iterator();
+    for (String k : keys) {
+      Assert.assertEquals(k, itr.next());
+    }
+
+    testConfig.setTopology("/rack/zone/instance");
+    clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(testConfig);
+    Assert.assertEquals(clusterTopologyConfig.getEndNodeType(), "instance");
+    Assert.assertEquals(clusterTopologyConfig.getFaultZoneType(), "zone");
+    Assert.assertEquals(clusterTopologyConfig.getTopologyKeyDefaultValue().size(), 3);
+    keys = new String[] {"rack", "zone", "instance"};
+    itr = clusterTopologyConfig.getTopologyKeyDefaultValue().keySet().iterator();
+    for (String k : keys) {
+      Assert.assertEquals(k, itr.next());
+    }
+  }
+
+  @Test(expectedExceptions = HelixException.class)
+  public void testClusterInvalidTopology() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.setTopologyAwareEnabled(true);
+    testConfig.setTopology("/zone/instance");
+    testConfig.setFaultZoneType("rack");
+    ClusterTopologyConfig.createFromClusterConfig(testConfig);
+  }
+}