You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/08/12 18:35:55 UTC

[helix] branch master updated: Validate instance topology configuration before let it comes online (#1129)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 0869e72  Validate instance topology configuration before let it comes online  (#1129)
0869e72 is described below

commit 0869e729530d83ba55228b2d44a79ae44175c081
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Wed Aug 12 11:35:45 2020 -0700

    Validate instance topology configuration before let it comes online  (#1129)
    
    Helix does not check if an instance's topology config is defined correctly when the instance is added or joins cluster. This will cause Helix to fail the rebalance. This change adds the sanity check when
    
    Instance joins cluster as liveInstance
    User update InstanceConfig using RestApi
    Controller prepares the topology structure for rebalance
    This change also did come code clean up and centralized the logic for topology sanity check.
---
 .../main/java/org/apache/helix/ConfigAccessor.java |   2 +-
 .../controller/rebalancer/topology/Topology.java   | 163 +++++++++++++--------
 .../rebalancer/waged/model/AssignableNode.java     |  48 ++----
 .../helix/manager/zk/ParticipantManager.java       |  32 ++--
 .../org/apache/helix/model/InstanceConfig.java     |  16 ++
 .../rebalancer/waged/model/TestAssignableNode.java |   2 +-
 .../manager/TestParticipantManager.java            |  49 +++++++
 .../resources/helix/PerInstanceAccessor.java       |  56 +++++--
 .../helix/rest/server/TestPerInstanceAccessor.java |  53 +++++++
 9 files changed, 300 insertions(+), 121 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
index be56769..bc271c5 100644
--- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
@@ -945,7 +945,7 @@ public class ConfigAccessor {
    * replaced with the value of the same field in given config if it presents. If there is new field
    * in given config but not in current config, the field will be added into the current config..
    * The list fields and map fields will be replaced as a single entry.
-   * The current Cluster config will be replaced with the given clusterConfig. WARNING: This is not
+   * The current instanceConfig will be replaced with the given instanceConfig. WARNING: This is not
    * thread-safe or concurrent updates safe.
    * *
    *
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
index fad7897..3bc2e3a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
@@ -23,9 +23,7 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -58,10 +56,12 @@ public class Topology {
   private final ClusterConfig _clusterConfig;
   private static final String DEFAULT_DOMAIN_PREFIX = "Helix_default_";
 
-  private String _faultZoneType;
-  private String _endNodeType;
-  private LinkedHashSet<String> _clusterTopologyKeys;
-  private Map<String, String> _defaultDomainPathValues = new HashMap<>();
+  static class ClusterTopologyConfig {
+    String endNodeType;
+    String faultZoneType;
+    LinkedHashMap<String, String> topologyKeyDefaultValue = new LinkedHashMap<>();
+  }
+  private ClusterTopologyConfig _clusterTopologyConfig;
 
   public Topology(final List<String> allNodes, final List<String> liveNodes,
       final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) {
@@ -79,51 +79,17 @@ public class Topology {
           _allInstances.removeAll(_instanceConfigMap.keySet())));
     }
     _clusterConfig = clusterConfig;
-    _clusterTopologyKeys = new LinkedHashSet<>();
+    _clusterTopologyConfig = getClusterTopologySetting(clusterConfig);
 
-    if (clusterConfig.isTopologyAwareEnabled()) {
-      String topologyDef = _clusterConfig.getTopology();
-      if (topologyDef != null) {
-        // Customized cluster topology definition is configured.
-        String[] topologyKeys = topologyDef.trim().split("/");
-        int lastValidTypeIdx = 0;
-        for (int i = 0; i < topologyKeys.length; i++) {
-          if (topologyKeys[i].length() != 0) {
-            _clusterTopologyKeys.add(topologyKeys[i]);
-            _defaultDomainPathValues.put(topologyKeys[i], DEFAULT_DOMAIN_PREFIX + topologyKeys[i]);
-            lastValidTypeIdx = i;
-          }
-        }
-        if (_clusterTopologyKeys.size() == 0) {
-          throw new HelixException("Invalid cluster topology definition " + topologyDef);
-        }
-        _endNodeType = topologyKeys[lastValidTypeIdx];
-        _faultZoneType = clusterConfig.getFaultZoneType();
-        if (_faultZoneType == null) {
-          _faultZoneType = _endNodeType;
-        } else if (!_clusterTopologyKeys.contains(_faultZoneType)) {
-          throw new HelixException(String
-              .format("Invalid fault zone type %s, not present in topology definition %s.",
-                  _faultZoneType, topologyDef));
-        }
-      } else {
-        // Use default cluster topology definition, i,e. /root/zone/instance
-        _endNodeType = Types.INSTANCE.name();
-        _faultZoneType = Types.ZONE.name();
-      }
-    } else {
-      _endNodeType = Types.INSTANCE.name();
-      _faultZoneType = Types.INSTANCE.name();
-    }
     _root = createClusterTree();
   }
 
   public String getEndNodeType() {
-    return _endNodeType;
+    return _clusterTopologyConfig.endNodeType;
   }
 
   public String getFaultZoneType() {
-    return _faultZoneType;
+    return _clusterTopologyConfig.faultZoneType;
   }
 
   public Node getRootNode() {
@@ -169,7 +135,8 @@ public class Topology {
     return newRoot;
   }
 
-  private static Node cloneTree(Node root, Map<Node, Integer> newNodeWeight, Set<Node> failedNodes) {
+  private static Node cloneTree(Node root, Map<Node, Integer> newNodeWeight,
+      Set<Node> failedNodes) {
     Node newRoot = root.clone();
     if (newNodeWeight.containsKey(root)) {
       newRoot.setWeight(newNodeWeight.get(root));
@@ -204,8 +171,9 @@ public class Topology {
       InstanceConfig insConfig = _instanceConfigMap.get(instanceName);
       try {
         LinkedHashMap<String, String> instanceTopologyMap =
-            computeInstanceTopologyMap(_clusterConfig.isTopologyAwareEnabled(), instanceName,
-                insConfig, _clusterTopologyKeys);
+            computeInstanceTopologyMapHelper(_clusterConfig.isTopologyAwareEnabled(), instanceName,
+                insConfig, _clusterTopologyConfig.topologyKeyDefaultValue,
+                null /*faultZoneForEarlyQuit*/);
         int weight = insConfig.getWeight();
         if (weight < 0 || weight == InstanceConfig.WEIGHT_NOT_SET) {
           weight = DEFAULT_NODE_WEIGHT;
@@ -231,19 +199,66 @@ public class Topology {
   }
 
   /**
-   * This function returns a LinkedHashMap<String, String> object representing
-   * the topology path for an instance.
-   * LinkedHashMap is used here since the order of the path needs to be preserved
-   * when creating the topology tree.
+   * Populate faultZone, endNodetype and and a LinkedHashMap containing pathKeys default values for
+   * clusterConfig.Topology. The LinkedHashMap will be empty if clusterConfig.Topology is unset.
    *
-   * @return an LinkedHashMap object representing the topology path for the input instance.
+   * @return an Instance of Topology.ClusterTopologyConfig.
    */
-  private LinkedHashMap<String, String> computeInstanceTopologyMap(boolean isTopologyAwareEnabled,
-      String instanceName, InstanceConfig instanceConfig, LinkedHashSet<String> clusterTopologyKeys)
+  private static ClusterTopologyConfig getClusterTopologySetting(ClusterConfig clusterConfig) {
+
+    ClusterTopologyConfig clusterTopologyConfig = new ClusterTopologyConfig();
+    if (clusterConfig.isTopologyAwareEnabled()) {
+      String topologyDef = clusterConfig.getTopology();
+      if (topologyDef != null) {
+        String[] topologyKeys = topologyDef.trim().split("/");
+        int lastValidTypeIdx = 0;
+        for (int i = 0; i < topologyKeys.length; i++) {
+          if (topologyKeys[i].length() != 0) {
+            clusterTopologyConfig.topologyKeyDefaultValue
+                .put(topologyKeys[i], DEFAULT_DOMAIN_PREFIX + topologyKeys[i]);
+            lastValidTypeIdx = i;
+          }
+        }
+        if (clusterTopologyConfig.topologyKeyDefaultValue.size() == 0) {
+          throw new IllegalArgumentException("Invalid cluster topology definition " + topologyDef);
+        }
+        clusterTopologyConfig.endNodeType = topologyKeys[lastValidTypeIdx];
+        String faultZoneType = clusterConfig.getFaultZoneType();
+        if (faultZoneType == null) {
+          clusterTopologyConfig.faultZoneType = clusterTopologyConfig.endNodeType;
+        } else if (!clusterTopologyConfig.topologyKeyDefaultValue.containsKey(faultZoneType)) {
+          throw new HelixException(String
+              .format("Invalid fault zone type %s, not present in topology definition %s.",
+                  faultZoneType, clusterConfig.getTopology()));
+        } else {
+          clusterTopologyConfig.faultZoneType = faultZoneType;
+        }
+      } else {
+        // Use default cluster topology definition, i,e. /root/zone/instance
+        clusterTopologyConfig.endNodeType = Types.INSTANCE.name();
+        clusterTopologyConfig.faultZoneType = Types.ZONE.name();
+      }
+    } else {
+      clusterTopologyConfig.endNodeType = Types.INSTANCE.name();
+      clusterTopologyConfig.faultZoneType = Types.INSTANCE.name();
+    }
+    return clusterTopologyConfig;
+  }
+
+  /**
+   * @param clusterTopologyKeyDefaultValue  a LinkedHashMap where keys are cluster topology path and
+   *                                       values are their corresponding default value. The entries
+   *                                        are ordered by ClusterConfig.topology setting.
+   * @param faultZoneForEarlyQuit   this flag is set to true only if caller wants the path
+   *                                to faultZone instead the whole path for the instance.
+   */
+  private static LinkedHashMap<String, String> computeInstanceTopologyMapHelper(
+      boolean isTopologyAwareEnabled, String instanceName, InstanceConfig instanceConfig,
+      LinkedHashMap<String, String> clusterTopologyKeyDefaultValue, String faultZoneForEarlyQuit)
       throws IllegalArgumentException {
     LinkedHashMap<String, String> instanceTopologyMap = new LinkedHashMap<>();
     if (isTopologyAwareEnabled) {
-      if (clusterTopologyKeys.size() == 0) {
+      if (clusterTopologyKeyDefaultValue.size() == 0) {
         // Return a ordered map using default cluster topology definition, i,e. /root/zone/instance
         String zone = instanceConfig.getZoneId();
         if (zone == null) {
@@ -252,6 +267,9 @@ public class Topology {
                   instanceName));
         }
         instanceTopologyMap.put(Types.ZONE.name(), zone);
+        if (faultZoneForEarlyQuit != null) {
+          return instanceTopologyMap;
+        }
         instanceTopologyMap.put(Types.INSTANCE.name(), instanceName);
       } else {
         /*
@@ -265,21 +283,24 @@ public class Topology {
                   instanceName));
         }
         int numOfMatchedKeys = 0;
-        for (String key : clusterTopologyKeys) {
+        for (String key : clusterTopologyKeyDefaultValue.keySet()) {
           // if a key does not exist in the instance domain config, using the default domain value.
           String value = domainAsMap.get(key);
           if (value == null || value.length() == 0) {
-            value = _defaultDomainPathValues.get(key);
+            value = clusterTopologyKeyDefaultValue.get(key);
           } else {
             numOfMatchedKeys++;
           }
           instanceTopologyMap.put(key, value);
+          if (key.equals(faultZoneForEarlyQuit)) {
+            return instanceTopologyMap;
+          }
         }
         if (numOfMatchedKeys != domainAsMap.size()) {
           logger.warn(
               "Key-value pairs in InstanceConfig.Domain {} do not align with keys in ClusterConfig.Topology "
                   + "{}, using default domain value instead", instanceConfig.getDomainAsString(),
-              clusterTopologyKeys.toString());
+              clusterTopologyKeyDefaultValue.keySet());
         }
       }
     } else {
@@ -290,6 +311,30 @@ public class Topology {
   }
 
   /**
+   * This function returns a LinkedHashMap<String, String> object representing
+   * the topology path for an instance.
+   * LinkedHashMap is used here since the order of the path needs to be preserved
+   * when creating the topology tree.
+   *
+   * @param clusterConfig         clusterConfig of the given cluster.
+   * @param instanceName          name of the instance.
+   * @param instanceConfig        instanceConfig to be checked.
+   * @param earlyQuitForFaultZone Set to true if we only need the path till faultZone.
+   *
+   * @return an LinkedHashMap object representing the topology path for the input instance.
+   * @throws IllegalArgumentException if input is not valid.
+   */
+  public static LinkedHashMap<String, String> computeInstanceTopologyMap(
+      ClusterConfig clusterConfig, String instanceName, InstanceConfig instanceConfig,
+      boolean earlyQuitForFaultZone) {
+    ClusterTopologyConfig clusterTopologyConfig = getClusterTopologySetting(clusterConfig);
+    String faultZoneForEarlyQuit =
+        earlyQuitForFaultZone ? clusterTopologyConfig.faultZoneType : null;
+    return computeInstanceTopologyMapHelper(clusterConfig.isTopologyAwareEnabled(), instanceName,
+        instanceConfig, clusterTopologyConfig.topologyKeyDefaultValue, faultZoneForEarlyQuit);
+  }
+
+  /**
    * Add an end node to the tree, create all the paths to the leaf node if not present.
    */
   private void addEndNode(Node root, String instanceName, LinkedHashMap<String, String> pathNameMap,
@@ -304,7 +349,7 @@ public class Topology {
       if (!current.hasChild(pathValue)) {
         buildNewNode(pathValue, path, current, instanceName, instanceWeight,
             liveInstances.contains(instanceName), pathNodes);
-      } else if (path.equals(_endNodeType)) {
+      } else if (path.equals(_clusterTopologyConfig.endNodeType)) {
         throw new HelixException(
             "Failed to add topology node because duplicate leaf nodes are not allowed. Duplicate node name: "
                 + pathValue);
@@ -321,7 +366,7 @@ public class Topology {
     n.setType(type);
     n.setParent(parent);
     // if it is leaf node, create an InstanceNode instead
-    if (type.equals(_endNodeType)) {
+    if (type.equals(_clusterTopologyConfig.endNodeType)) {
       n = new InstanceNode(n, instanceName);
       if (isLiveInstance) {
         // node is alive
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index a8a45ce..d3d014d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -31,6 +32,7 @@ import java.util.stream.Collectors;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.helix.HelixException;
+import org.apache.helix.controller.rebalancer.topology.Topology;
 import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
@@ -278,43 +280,17 @@ public class AssignableNode implements Comparable<AssignableNode> {
    * simpler than the CRUSH based rebalancer.
    */
   private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig instanceConfig) {
-    if (!clusterConfig.isTopologyAwareEnabled()) {
-      // Instance name is the default fault zone if topology awareness is false.
-      return instanceConfig.getInstanceName();
-    }
-    String topologyStr = clusterConfig.getTopology();
-    String faultZoneType = clusterConfig.getFaultZoneType();
-    if (topologyStr == null || faultZoneType == null) {
-      LOG.debug("Topology configuration is not complete. Topology define: {}, Fault Zone Type: {}",
-          topologyStr, faultZoneType);
-      // Use the instance name, or the deprecated ZoneId field (if exists) as the default fault
-      // zone.
-      String zoneId = instanceConfig.getZoneId();
-      return zoneId == null ? instanceConfig.getInstanceName() : zoneId;
-    } else {
-      // Get the fault zone information from the complete topology definition.
-      String[] topologyKeys = topologyStr.trim().split("/");
-      if (topologyKeys.length == 0 || Arrays.stream(topologyKeys)
-          .noneMatch(type -> type.equals(faultZoneType))) {
-        throw new HelixException(
-            "The configured topology definition is empty or does not contain the fault zone type.");
-      }
-
-      Map<String, String> domainAsMap = instanceConfig.getDomainAsMap();
-      StringBuilder faultZoneStringBuilder = new StringBuilder();
-      for (String key : topologyKeys) {
-        if (!key.isEmpty()) {
-          // if a key does not exist in the instance domain config, apply the default domain value.
-          faultZoneStringBuilder.append(domainAsMap.getOrDefault(key, "Default_" + key));
-          if (key.equals(faultZoneType)) {
-            break;
-          } else {
-            faultZoneStringBuilder.append('/');
-          }
-        }
-      }
-      return faultZoneStringBuilder.toString();
+    LinkedHashMap<String, String> instanceTopologyMap = Topology
+        .computeInstanceTopologyMap(clusterConfig, instanceConfig.getInstanceName(), instanceConfig,
+            true /*earlyQuitTillFaultZone*/);
+
+    StringBuilder faultZoneStringBuilder = new StringBuilder();
+    for (Map.Entry<String, String> entry : instanceTopologyMap.entrySet()) {
+      faultZoneStringBuilder.append(entry.getValue());
+      faultZoneStringBuilder.append('/');
     }
+    faultZoneStringBuilder.setLength(faultZoneStringBuilder.length() - 1);
+    return faultZoneStringBuilder.toString();
   }
 
   /**
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index b50617c..33e95a1 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -188,25 +188,31 @@ public class ParticipantManager {
       LOG.info("auto registration is false for cluster" + _clusterName);
     }
 
+    InstanceConfig instanceConfig;
     if (!ZKUtil.isInstanceSetup(_zkclient, _clusterName, _instanceName, _instanceType)) {
       if (!autoJoin) {
         throw new HelixException("Initial cluster structure is not set up for instance: "
             + _instanceName + ", instanceType: " + _instanceType);
+      }
+      if (!autoRegistration) {
+        LOG.info(_instanceName + " is auto-joining cluster: " + _clusterName);
+        instanceConfig = HelixUtil.composeInstanceConfig(_instanceName);
       } else {
-        if (!autoRegistration) {
-          LOG.info(_instanceName + " is auto-joining cluster: " + _clusterName);
-          _helixAdmin.addInstance(_clusterName, HelixUtil.composeInstanceConfig(_instanceName));
-        } else {
-          LOG.info(_instanceName + " is auto-registering cluster: " + _clusterName);
-          CloudInstanceInformation cloudInstanceInformation = getCloudInstanceInformation();
-          String domain = cloudInstanceInformation
-              .get(CloudInstanceInformation.CloudInstanceField.FAULT_DOMAIN.name()) + _instanceName;
-
-          InstanceConfig instanceConfig = HelixUtil.composeInstanceConfig(_instanceName);
-          instanceConfig.setDomain(domain);
-          _helixAdmin.addInstance(_clusterName, instanceConfig);
-        }
+        LOG.info(_instanceName + " is auto-registering cluster: " + _clusterName);
+        CloudInstanceInformation cloudInstanceInformation = getCloudInstanceInformation();
+        String domain = cloudInstanceInformation
+            .get(CloudInstanceInformation.CloudInstanceField.FAULT_DOMAIN.name()) + _instanceName;
+        instanceConfig = HelixUtil.composeInstanceConfig(_instanceName);
+        instanceConfig.setDomain(domain);
       }
+      instanceConfig
+          .validateTopologySettingInInstanceConfig(_configAccessor.getClusterConfig(_clusterName),
+              _instanceName);
+      _helixAdmin.addInstance(_clusterName, instanceConfig);
+    } else {
+      _configAccessor.getInstanceConfig(_clusterName, _instanceName)
+          .validateTopologySettingInInstanceConfig(_configAccessor.getClusterConfig(_clusterName),
+              _instanceName);
     }
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index 70cfbc6..cbcab29 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
 
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
+import org.apache.helix.controller.rebalancer.topology.Topology;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.slf4j.Logger;
@@ -635,4 +636,19 @@ public class InstanceConfig extends HelixProperty {
     }
     return config;
   }
+
+  /**
+   * Validate if the topology related settings (Domain or ZoneId) in the given instanceConfig
+   * are valid and align with current clusterConfig.
+   * This function should be called when instance added to cluster or caller updates instanceConfig.
+   *
+   * @throws IllegalArgumentException
+   */
+  public boolean validateTopologySettingInInstanceConfig(ClusterConfig clusterConfig,
+      String instanceName) {
+    //IllegalArgumentException will be thrown here if the input is not valid.
+    Topology.computeInstanceTopologyMap(clusterConfig, instanceName, this,
+        false /*earlyQuitForFaultZone*/);
+    return true;
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index 1be355e..0245ffa 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -211,7 +211,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
 
     AssignableNode node = new AssignableNode(testCache.getClusterConfig(),
         testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
-    Assert.assertEquals(node.getFaultZone(), "Default_zone");
+    Assert.assertEquals(node.getFaultZone(), "Helix_default_zone");
   }
 
   @Test
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
index 753c217..7f747d6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
@@ -21,6 +21,7 @@ package org.apache.helix.integration.manager;
 
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -30,6 +31,7 @@ import javax.management.MalformedObjectNameException;
 import javax.management.ObjectInstance;
 import javax.management.ObjectName;
 
+import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
@@ -37,6 +39,8 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.TestHelper;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.ZkTestHelper;
 import org.apache.helix.common.ZkTestBase;
@@ -114,6 +118,51 @@ public class TestParticipantManager extends ZkTestBase {
     Assert.assertNull(accessor.getProperty(keyBuilder.controllerLeader()));
   }
 
+  @Test
+  public void simpleIntegrationTestNeg() throws Exception {
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        4, // partitions per resource
+        1, // number of nodes
+        1, // replicas
+        "MasterSlave", true); // do rebalance
+
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    clusterConfig.getRecord()
+        .setListField(ClusterConfig.ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(),
+            new ArrayList<>());
+    clusterConfig.setTopologyAwareEnabled(true);
+    clusterConfig.setTopology("/Rack/Sub-Rack/Host/Instance");
+    clusterConfig.setFaultZoneType("Host");
+    configAccessor.setClusterConfig(clusterName, clusterConfig);
+
+
+    String instanceName = "localhost_12918";
+    HelixManager participant =
+        new ZKHelixManager(clusterName, instanceName , InstanceType.PARTICIPANT, ZK_ADDR);
+    participant.getStateMachineEngine().registerStateModelFactory("MasterSlave",
+        new MockMSModelFactory());
+    // We are expecting an IllegalArgumentException since the domain is not set.
+    try {
+      participant.connect();
+      Assert.fail();  // connect will throw exception. The assertion will never be reached.
+    } catch (IllegalArgumentException expected) {
+      Assert.assertEquals(expected.getMessage(),
+          "Domain for instance localhost_12918 is not set, fail the topology-aware placement!");
+    }
+
+    // verify there is no live-instances created
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance("localhost_12918")));
+    Assert.assertNull(accessor.getProperty(keyBuilder.controllerLeader()));
+  }
+
   @Test // (dependsOnMethods = "simpleIntegrationTest")
   public void testMonitoringLevel() throws Exception {
     int n = 1;
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
index fa8710f..604669b 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
@@ -41,6 +41,7 @@ import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.model.CurrentState;
@@ -310,20 +311,34 @@ public class PerInstanceAccessor extends AbstractHelixResource {
     }
     InstanceConfig instanceConfig = new InstanceConfig(record);
     ConfigAccessor configAccessor = getConfigAccessor();
+
     try {
       switch (command) {
-      case update:
-        configAccessor.updateInstanceConfig(clusterId, instanceName, instanceConfig);
-        break;
-      case delete:
-        HelixConfigScope instanceScope =
-            new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT)
-                .forCluster(clusterId).forParticipant(instanceName).build();
-        configAccessor.remove(instanceScope, record);
-        break;
-      default:
-        return badRequest(String.format("Unsupported command: %s", command));
+        case update:
+          /*
+           * The new instanceConfig will be merged with existing one.
+           * Even if the instance is disabled, non-valid instance topology config will cause rebalance
+           * failure. We are doing the check whenever user updates InstanceConfig.
+           */
+          validateDeltaTopologySettingInInstanceConfig(clusterId, instanceName, configAccessor,
+              instanceConfig, command);
+          configAccessor.updateInstanceConfig(clusterId, instanceName, instanceConfig);
+          break;
+        case delete:
+          validateDeltaTopologySettingInInstanceConfig(clusterId, instanceName, configAccessor,
+              instanceConfig, command);
+          HelixConfigScope instanceScope =
+              new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT)
+                  .forCluster(clusterId).forParticipant(instanceName).build();
+          configAccessor.remove(instanceScope, record);
+          break;
+        default:
+          return badRequest(String.format("Unsupported command: %s", command));
       }
+    } catch (IllegalArgumentException ex) {
+      LOG.error(String.format("Invalid topology setting for Instance : {}. Fail the config update",
+          instanceName), ex);
+      return serverError(ex);
     } catch (HelixException ex) {
       return notFound(ex.getMessage());
     } catch (Exception ex) {
@@ -544,4 +559,23 @@ public class PerInstanceAccessor extends AbstractHelixResource {
   private boolean validInstance(JsonNode node, String instanceName) {
     return instanceName.equals(node.get(Properties.id.name()).getValueAsText());
   }
+
+  private boolean validateDeltaTopologySettingInInstanceConfig(String clusterName,
+      String instanceName, ConfigAccessor configAccessor, InstanceConfig newInstanceConfig,
+      Command command) {
+    InstanceConfig originalInstanceConfigCopy =
+        configAccessor.getInstanceConfig(clusterName, instanceName);
+    if (command == Command.delete) {
+      for (Map.Entry<String, String> entry : newInstanceConfig.getRecord().getSimpleFields()
+          .entrySet()) {
+        originalInstanceConfigCopy.getRecord().getSimpleFields().remove(entry.getKey());
+      }
+    } else {
+      originalInstanceConfigCopy.getRecord().update(newInstanceConfig.getRecord());
+    }
+
+    return originalInstanceConfigCopy
+        .validateTopologySettingInInstanceConfig(configAccessor.getClusterConfig(clusterName),
+            instanceName);
+  }
 }
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
index 1fa852a..7c40e7b 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
@@ -409,6 +409,7 @@ public class TestPerInstanceAccessor extends AbstractTestClass {
   @Test(dependsOnMethods = "checkUpdateFails")
   public void testValidateWeightForInstance()
       throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
     // Empty out ClusterConfig's weight key setting and InstanceConfig's capacity maps for testing
     ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
     clusterConfig.getRecord()
@@ -462,5 +463,57 @@ public class TestPerInstanceAccessor extends AbstractTestClass {
     // Must have the results saying they are all valid (true) because capacity keys are set
     // in ClusterConfig
     node.iterator().forEachRemaining(child -> Assert.assertTrue(child.getBooleanValue()));
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
+  /**
+   * Test the sanity check when updating the instance config.
+   * The config is validated at rest server side.
+   */
+  @Test(dependsOnMethods = "testValidateWeightForInstance")
+  public void testValidateDeltaInstanceConfigForUpdate() throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+    // Enable Topology aware for the cluster
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.getRecord()
+        .setListField(ClusterConfig.ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(),
+            new ArrayList<>());
+    clusterConfig.setTopologyAwareEnabled(true);
+    clusterConfig.setTopology("/Rack/Sub-Rack/Host/Instance");
+    clusterConfig.setFaultZoneType("Host");
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    String instanceName = CLUSTER_NAME + "localhost_12918";
+    InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName);
+
+    // Update InstanceConfig with Topology Info
+    String domain = "Rack=rack1, Sub-Rack=Sub-Rack1, Host=Host-1";
+    ZNRecord record = instanceConfig.getRecord();
+    record.getSimpleFields().put(InstanceConfig.InstanceConfigProperty.DOMAIN.name(), domain);
+
+    // Add these fields by way of "update"
+    Entity entity =
+        Entity.entity(OBJECT_MAPPER.writeValueAsString(record), MediaType.APPLICATION_JSON_TYPE);
+    Response response = new JerseyUriRequestBuilder(
+        "clusters/{}/instances/{}/configs?command=update&doSanityCheck=true")
+        .format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
+    // Check that the fields have been added
+    Assert.assertEquals(response.getStatus(), 200);
+    // Check the cluster config is updated
+    Assert.assertEquals(
+        _configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName).getDomainAsString(), domain);
+
+    // set domain to an invalid value
+    record.getSimpleFields()
+        .put(InstanceConfig.InstanceConfigProperty.DOMAIN.name(), "InvalidDomainValue");
+    entity =
+        Entity.entity(OBJECT_MAPPER.writeValueAsString(record), MediaType.APPLICATION_JSON_TYPE);
+    // Updating using an invalid domain value should return a non-OK response
+    new JerseyUriRequestBuilder(
+        "clusters/{}/instances/{}/configs?command=update&doSanityCheck=true")
+        .expectedReturnStatusCode(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode())
+        .format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
+
+    System.out.println("End test :" + TestHelper.getTestMethodName());
   }
 }