You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/04/01 22:47:42 UTC

[helix] 47/49: Make Helix REST realm-aware (#908)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch zooscalability
in repository https://gitbox.apache.org/repos/asf/helix.git

commit c7cdf986d9303ace339ae6497e9aa7c884562fb7
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Wed Mar 25 22:20:10 2020 -0700

    Make Helix REST realm-aware (#908)
    
    Helix REST needs to start using a realm-aware ZkClient on multi-zk mode. Also it needs to become a listener on routing data because we don't want to restart the HelixRestServer every time we update the routing data.
    
    Changelist:
    
    Make ServerContext listen on routing data paths if run on multi-zk mode
    Make HelixRestServer use RealmAwareZkClient (FederatedZkClient) on multi-zk mode
---
 .../java/org/apache/helix/task/TaskDriver.java     |   6 +-
 .../helix/rest/common/HelixRestNamespace.java      |  24 +-
 .../metadatastore/ZkMetadataStoreDirectory.java    |  55 +++-
 .../accessor/ZkRoutingDataReader.java              |  37 +--
 .../accessor/ZkRoutingDataWriter.java              |  17 +-
 .../apache/helix/rest/server/HelixRestMain.java    |   6 +-
 .../apache/helix/rest/server/HelixRestServer.java  |   2 +-
 .../apache/helix/rest/server/ServerContext.java    | 289 +++++++++++++++++----
 .../resources/helix/AbstractHelixResource.java     |  14 +-
 .../server/resources/helix/ClusterAccessor.java    |  11 +-
 .../server/resources/helix/ResourceAccessor.java   |  10 +-
 .../integration/TestRoutingDataUpdate.java         | 176 +++++++++++++
 .../helix/rest/server/AbstractTestClass.java       |  40 +--
 .../constant/MetadataStoreRoutingConstants.java    |   2 +
 .../helix/msdcommon/util/ZkValidationUtil.java     |   3 +-
 pom.xml                                            |  34 ---
 .../zookeeper/api/client/RealmAwareZkClient.java   |  16 ++
 .../zookeeper/util/HttpRoutingDataReader.java      |  14 +-
 .../impl/client/RealmAwareZkClientTestBase.java    |   2 +-
 19 files changed, 570 insertions(+), 188 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 987cc44..506a06b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -47,7 +47,7 @@ import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.helix.store.HelixPropertyStore;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.util.HelixUtil;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
 import org.slf4j.Logger;
@@ -98,12 +98,12 @@ public class TaskDriver {
   }
 
   @Deprecated
-  public TaskDriver(HelixZkClient client, String clusterName) {
+  public TaskDriver(RealmAwareZkClient client, String clusterName) {
     this(client, new ZkBaseDataAccessor<>(client), clusterName);
   }
 
   @Deprecated
-  public TaskDriver(HelixZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor,
+  public TaskDriver(RealmAwareZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor,
       String clusterName) {
     this(new ZKHelixAdmin(client), new ZKHelixDataAccessor(clusterName, baseAccessor),
         new ZkHelixPropertyStore<>(baseAccessor, PropertyPathBuilder.propertyStore(clusterName),
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java
index a2fb52c..0632f36 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java
@@ -34,7 +34,8 @@ public class HelixRestNamespace {
     NAME,
     METADATA_STORE_TYPE,
     METADATA_STORE_ADDRESS,
-    IS_DEFAULT
+    IS_DEFAULT,
+    MSDS_ENDPOINT
   }
 
   /**
@@ -55,7 +56,7 @@ public class HelixRestNamespace {
   private HelixMetadataStoreType _metadataStoreType;
 
   /**
-   * Address of metadata store. Should be informat of
+   * Address of metadata store. Should be in the format of
    * "[ip-address]:[port]" or "[dns-name]:[port]"
    */
   private String _metadataStoreAddress;
@@ -65,16 +66,27 @@ public class HelixRestNamespace {
    */
   private boolean _isDefault;
 
+  /**
+   * Endpoint for accessing MSDS for this namespace.
+   */
+  private String _msdsEndpoint;
+
   public HelixRestNamespace(String metadataStoreAddress) throws IllegalArgumentException {
     this(DEFAULT_NAMESPACE_NAME, HelixMetadataStoreType.ZOOKEEPER, metadataStoreAddress, true);
   }
 
-  public HelixRestNamespace(String name, HelixMetadataStoreType metadataStoreType, String metadataStoreAddress, boolean isDefault)
-      throws IllegalArgumentException {
+  public HelixRestNamespace(String name, HelixMetadataStoreType metadataStoreType,
+      String metadataStoreAddress, boolean isDefault) throws IllegalArgumentException {
+    this(name, metadataStoreType, metadataStoreAddress, isDefault, null);
+  }
+
+  public HelixRestNamespace(String name, HelixMetadataStoreType metadataStoreType,
+      String metadataStoreAddress, boolean isDefault, String msdsEndpoint) {
     _name = name;
     _metadataStoreAddress = metadataStoreAddress;
     _metadataStoreType = metadataStoreType;
     _isDefault = isDefault;
+    _msdsEndpoint = msdsEndpoint;
     validate();
   }
 
@@ -109,4 +121,8 @@ public class HelixRestNamespace {
     ret.put(HelixRestNamespaceProperty.IS_DEFAULT.name(), String.valueOf(_isDefault));
     return ret;
   }
+
+  public String getMsdsEndpoint() {
+    return _msdsEndpoint;
+  }
 }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java
index c83245f..42b2b17 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java
@@ -39,6 +39,7 @@ import org.apache.helix.rest.metadatastore.accessor.MetadataStoreRoutingDataWrit
 import org.apache.helix.rest.metadatastore.accessor.ZkRoutingDataReader;
 import org.apache.helix.rest.metadatastore.accessor.ZkRoutingDataWriter;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
 import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
@@ -98,20 +99,27 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, Routing
     if (!_routingZkAddressMap.containsKey(namespace)) {
       synchronized (_routingZkAddressMap) {
         if (!_routingZkAddressMap.containsKey(namespace)) {
-          // Ensure that ROUTING_DATA_PATH exists in ZK.
-          HelixZkClient zkClient = DedicatedZkClientFactory.getInstance()
-              .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
-                  new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
+          HelixZkClient zkClient = null;
           try {
-            zkClient.createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, true);
-          } catch (ZkNodeExistsException e) {
-            // The node already exists and it's okay
+            // Ensure that ROUTING_DATA_PATH exists in ZK.
+            zkClient = DedicatedZkClientFactory.getInstance()
+                .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
+                    new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
+            createRoutingDataPath(zkClient, zkAddress);
+          } finally {
+            if (zkClient != null && !zkClient.isClosed()) {
+              zkClient.close();
+            }
+          }
+          try {
+            _routingZkAddressMap.put(namespace, zkAddress);
+            _routingDataReaderMap
+                .put(namespace, new ZkRoutingDataReader(namespace, zkAddress, this));
+            _routingDataWriterMap.put(namespace, new ZkRoutingDataWriter(namespace, zkAddress));
+          } catch (IllegalArgumentException | IllegalStateException e) {
+            LOG.error("ZkMetadataStoreDirectory: initializing ZkRoutingDataReader/Writer failed!",
+                e);
           }
-
-          _routingZkAddressMap.put(namespace, zkAddress);
-          _routingDataReaderMap.put(namespace, new ZkRoutingDataReader(namespace, zkAddress, this));
-          _routingDataWriterMap.put(namespace, new ZkRoutingDataWriter(namespace, zkAddress));
-
           // Populate realmToShardingKeys with ZkRoutingDataReader
           Map<String, List<String>> rawRoutingData =
               _routingDataReaderMap.get(namespace).getRoutingData();
@@ -119,7 +127,8 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, Routing
           try {
             _routingDataMap.put(namespace, new TrieRoutingData(rawRoutingData));
           } catch (InvalidRoutingDataException e) {
-            LOG.warn("TrieRoutingData is not created for namespace {}", namespace, e);
+            LOG.warn("ZkMetadataStoreDirectory: TrieRoutingData is not created for namespace {}",
+                namespace, e);
           }
         }
       }
@@ -145,7 +154,7 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, Routing
       throw new NoSuchElementException("Namespace " + namespace + " does not exist!");
     }
     Set<String> allShardingKeys = new HashSet<>();
-    _realmToShardingKeysMap.get(namespace).values().forEach(keys -> allShardingKeys.addAll(keys));
+    _realmToShardingKeysMap.get(namespace).values().forEach(allShardingKeys::addAll);
     return allShardingKeys;
   }
 
@@ -339,4 +348,22 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, Routing
     _routingDataMap.clear();
     _zkMetadataStoreDirectoryInstance = null;
   }
+
+  /**
+   * Make sure the root routing data path exists. Also, register the routing ZK address.
+   * @param zkClient
+   */
+  public static void createRoutingDataPath(HelixZkClient zkClient, String zkAddress) {
+    try {
+      zkClient.createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, true);
+    } catch (ZkNodeExistsException e) {
+      // The node already exists and it's okay
+    }
+    // Make sure ROUTING_DATA_PATH is mapped to the routing ZK so that FederatedZkClient used
+    // in Helix REST can subscribe to the routing data path
+    ZNRecord znRecord = new ZNRecord(MetadataStoreRoutingConstants.ROUTING_DATA_PATH.substring(1));
+    znRecord.setListField(MetadataStoreRoutingConstants.ROUTING_ZK_ADDRESS_KEY,
+        Collections.singletonList(zkAddress));
+    zkClient.writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, znRecord);
+  }
 }
\ No newline at end of file
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java
index 6c75618..cfe6eb5 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java
@@ -27,7 +27,9 @@ import java.util.Map;
 import org.apache.helix.msdcommon.callback.RoutingDataListener;
 import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
 import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
+import org.apache.helix.rest.metadatastore.ZkMetadataStoreDirectory;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
@@ -35,7 +37,6 @@ import org.apache.helix.zookeeper.zkclient.IZkChildListener;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
 import org.apache.helix.zookeeper.zkclient.IZkStateListener;
 import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
-import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
 import org.apache.zookeeper.Watcher;
 
 
@@ -59,24 +60,11 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD
         .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
             new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
 
-    // Ensure that ROUTING_DATA_PATH exists in ZK. If not, create
-    // create() semantic will fail if it already exists
-    try {
-      _zkClient.createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, true);
-    } catch (ZkNodeExistsException e) {
-      // This is okay
-    }
+    ZkMetadataStoreDirectory.createRoutingDataPath(_zkClient, _zkAddress);
 
     _routingDataListener = routingDataListener;
     if (_routingDataListener != null) {
-      // Subscribe child changes
-      _zkClient.subscribeChildChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, this);
-      // Subscribe data changes
-      for (String child : _zkClient.getChildren(MetadataStoreRoutingConstants.ROUTING_DATA_PATH)) {
-        _zkClient
-            .subscribeDataChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + child,
-                this);
-      }
+      _zkClient.subscribeRoutingDataChanges(this, this);
     }
   }
 
@@ -118,7 +106,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD
 
   @Override
   public synchronized void handleDataChange(String s, Object o) {
-    if (_zkClient.isClosed()) {
+    if (_zkClient == null || _zkClient.isClosed()) {
       return;
     }
     _routingDataListener.refreshRoutingData(_namespace);
@@ -138,7 +126,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD
 
   @Override
   public synchronized void handleStateChanged(Watcher.Event.KeeperState state) {
-    if (_zkClient.isClosed()) {
+    if (_zkClient == null || _zkClient.isClosed()) {
       return;
     }
     _routingDataListener.refreshRoutingData(_namespace);
@@ -146,7 +134,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD
 
   @Override
   public synchronized void handleNewSession(String sessionId) {
-    if (_zkClient.isClosed()) {
+    if (_zkClient == null || _zkClient.isClosed()) {
       return;
     }
     _routingDataListener.refreshRoutingData(_namespace);
@@ -154,24 +142,19 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD
 
   @Override
   public synchronized void handleSessionEstablishmentError(Throwable error) {
-    if (_zkClient.isClosed()) {
+    if (_zkClient == null || _zkClient.isClosed()) {
       return;
     }
     _routingDataListener.refreshRoutingData(_namespace);
   }
 
   private void handleResubscription() {
-    if (_zkClient.isClosed()) {
+    if (_zkClient == null || _zkClient.isClosed()) {
       return;
     }
-
     // Renew subscription
     _zkClient.unsubscribeAll();
-    _zkClient.subscribeChildChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, this);
-    for (String child : _zkClient.getChildren(MetadataStoreRoutingConstants.ROUTING_DATA_PATH)) {
-      _zkClient.subscribeDataChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + child,
-          this);
-    }
+    _zkClient.subscribeRoutingDataChanges(this, this);
     _routingDataListener.refreshRoutingData(_namespace);
   }
 }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java
index 32b7681..791d9bb 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java
@@ -30,6 +30,7 @@ import javax.ws.rs.core.Response;
 
 import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
 import org.apache.helix.rest.common.HttpConstants;
+import org.apache.helix.rest.metadatastore.ZkMetadataStoreDirectory;
 import org.apache.helix.rest.metadatastore.concurrency.ZkDistributedLeaderElection;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -82,20 +83,16 @@ public class ZkRoutingDataWriter implements MetadataStoreRoutingDataWriter {
         .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
             new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
 
-    // Ensure that ROUTING_DATA_PATH exists in ZK. If not, create
-    // create() semantic will fail if it already exists
-    try {
-      _zkClient.createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, true);
-    } catch (ZkNodeExistsException e) {
-      // This is okay
-    }
+    ZkMetadataStoreDirectory.createRoutingDataPath(_zkClient, zkAddress);
 
     // Get the hostname (REST endpoint) from System property
     String hostName = System.getProperty(MetadataStoreRoutingConstants.MSDS_SERVER_HOSTNAME_KEY);
     if (hostName == null || hostName.isEmpty()) {
-      throw new IllegalStateException(
-          "Hostname is not set or is empty. System.getProperty fails to fetch "
-              + MetadataStoreRoutingConstants.MSDS_SERVER_HOSTNAME_KEY + ".");
+      String errMsg =
+          "ZkRoutingDataWriter: Hostname is not set or is empty. System.getProperty fails to fetch "
+              + MetadataStoreRoutingConstants.MSDS_SERVER_HOSTNAME_KEY;
+      LOG.error(errMsg);
+      throw new IllegalStateException(errMsg);
     }
     _myHostName = HttpConstants.HTTP_PROTOCOL_PREFIX + hostName;
 
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestMain.java b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestMain.java
index b28f227..49940c3 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestMain.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestMain.java
@@ -146,10 +146,12 @@ public class HelixRestMain {
       // Currently we don't support adding default namespace through yaml manifest so all
       // namespaces created here will not be default
       // TODO: support specifying default namespace from config file
-      namespaces.add(new HelixRestNamespace(config.get(HelixRestNamespace.HelixRestNamespaceProperty.NAME.name()),
+      namespaces.add(new HelixRestNamespace(
+          config.get(HelixRestNamespace.HelixRestNamespaceProperty.NAME.name()),
           HelixRestNamespace.HelixMetadataStoreType.valueOf(
               config.get(HelixRestNamespace.HelixRestNamespaceProperty.METADATA_STORE_TYPE.name())),
-          config.get(HelixRestNamespace.HelixRestNamespaceProperty.METADATA_STORE_ADDRESS.name()), false));
+          config.get(HelixRestNamespace.HelixRestNamespaceProperty.METADATA_STORE_ADDRESS.name()),
+          false, config.get(HelixRestNamespace.HelixRestNamespaceProperty.MSDS_ENDPOINT.name())));
     }
   }
 
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java
index 64c1139..e6b5b34 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java
@@ -149,7 +149,7 @@ public class HelixRestServer {
     // Enable the default statistical monitoring MBean for Jersey server
     cfg.property(ServerProperties.MONITORING_STATISTICS_MBEANS_ENABLED, true);
     cfg.property(ContextPropertyKeys.SERVER_CONTEXT.name(),
-        new ServerContext(namespace.getMetadataStoreAddress()));
+        new ServerContext(namespace.getMetadataStoreAddress(), namespace.getMsdsEndpoint()));
     if (type == ServletType.DEFAULT_SERVLET) {
       cfg.property(ContextPropertyKeys.ALL_NAMESPACES.name(), _helixNamespaces);
     } else {
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
index b845356..52f1738 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
@@ -20,34 +20,53 @@ package org.apache.helix.rest.server;
  * under the License.
  */
 
-import java.util.HashMap;
+import java.io.IOException;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.InstanceType;
-import org.apache.helix.rest.metadatastore.ZkMetadataStoreDirectory;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.zookeeper.impl.client.ZkClient;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
-import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
+import org.apache.helix.rest.metadatastore.ZkMetadataStoreDirectory;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
+import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
+import org.apache.helix.zookeeper.util.HttpRoutingDataReader;
+import org.apache.helix.zookeeper.zkclient.IZkChildListener;
+import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
 import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
 import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
+public class ServerContext implements IZkDataListener, IZkChildListener, IZkStateListener {
+  private static final Logger LOG = LoggerFactory.getLogger(ServerContext.class);
 
-public class ServerContext {
   private final String _zkAddr;
-  private HelixZkClient _zkClient;
-  private ZKHelixAdmin _zkHelixAdmin;
-  private ClusterSetup _clusterSetup;
-  private ConfigAccessor _configAccessor;
+  private final String _msdsEndpoint;
+  private volatile RealmAwareZkClient _zkClient;
+
+  private volatile ZKHelixAdmin _zkHelixAdmin;
+  private volatile ClusterSetup _clusterSetup;
+  private volatile ConfigAccessor _configAccessor;
   // A lazily-initialized base data accessor that reads/writes byte array to ZK
   // TODO: Only read (deserialize) is supported at this time. This baseDataAccessor should support write (serialize) as needs arise
   private volatile ZkBaseDataAccessor<byte[]> _byteArrayZkBaseDataAccessor;
@@ -55,76 +74,149 @@ public class ServerContext {
   private final Map<String, HelixDataAccessor> _helixDataAccessorPool;
   // 1 Cluster name will correspond to 1 task driver
   private final Map<String, TaskDriver> _taskDriverPool;
+
+  /**
+   * Multi-ZK support
+   */
   private ZkMetadataStoreDirectory _zkMetadataStoreDirectory;
+  // Create a dedicated ZkClient for listening to data changes in routing data
+  private RealmAwareZkClient _zkClientForListener;
 
   public ServerContext(String zkAddr) {
+    this(zkAddr, null);
+  }
+
+  /**
+   * Initializes a ServerContext for this namespace.
+   * @param zkAddr routing ZK address (on multi-zk mode)
+   * @param msdsEndpoint if given, this server context will try to read routing data from this MSDS.
+   */
+  public ServerContext(String zkAddr, String msdsEndpoint) {
     _zkAddr = zkAddr;
+    _msdsEndpoint = msdsEndpoint; // only applicable on multi-zk mode
 
     // We should NOT initiate _zkClient and anything that depends on _zkClient in
     // constructor, as it is reasonable to start up HelixRestServer first and then
     // ZooKeeper. In this case, initializing _zkClient will fail and HelixRestServer
     // cannot be started correctly.
-    _helixDataAccessorPool = new HashMap<>();
-    _taskDriverPool = new HashMap<>();
+    _helixDataAccessorPool = new ConcurrentHashMap<>();
+    _taskDriverPool = new ConcurrentHashMap<>();
+
     // Initialize the singleton ZkMetadataStoreDirectory instance to allow it to be closed later
     _zkMetadataStoreDirectory = ZkMetadataStoreDirectory.getInstance();
   }
 
-  public HelixZkClient getHelixZkClient() {
+  public RealmAwareZkClient getRealmAwareZkClient() {
     if (_zkClient == null) {
-      HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
-      clientConfig.setZkSerializer(new ZNRecordSerializer());
-      _zkClient = SharedZkClientFactory.getInstance()
-          .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr), clientConfig);
+      synchronized (this) {
+        if (_zkClient == null) {
+          // If the multi ZK config is enabled, use FederatedZkClient on multi-realm mode
+          if (Boolean.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
+            LOG.info("ServerContext: initializing FederatedZkClient with routing ZK at {}!",
+                _zkAddr);
+            try {
+              RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
+                  new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
+              // If MSDS endpoint is set for this namespace, use that instead.
+              if (_msdsEndpoint != null && !_msdsEndpoint.isEmpty()) {
+                connectionConfigBuilder.setMsdsEndpoint(_msdsEndpoint);
+              }
+              _zkClient = new FederatedZkClient(connectionConfigBuilder.build(),
+                  new RealmAwareZkClient.RealmAwareZkClientConfig()
+                      .setZkSerializer(new ZNRecordSerializer()));
+
+              // Make sure the ServerContext is subscribed to routing data change so that it knows
+              // when to reset ZkClient and Helix APIs
+              if (_zkClientForListener == null) {
+                _zkClientForListener = DedicatedZkClientFactory.getInstance()
+                    .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr),
+                        new HelixZkClient.ZkClientConfig()
+                            .setZkSerializer(new ZNRecordSerializer()));
+              }
+              // Refresh data subscription
+              _zkClientForListener.unsubscribeAll();
+              _zkClientForListener.subscribeRoutingDataChanges(this, this);
+            } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+              throw new HelixException("Failed to create FederatedZkClient!", e);
+            }
+          } else {
+            // If multi ZK config is not set, just connect to the ZK address given
+            HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+            clientConfig.setZkSerializer(new ZNRecordSerializer());
+            _zkClient = SharedZkClientFactory.getInstance()
+                .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr), clientConfig);
+          }
+        }
+      }
     }
     return _zkClient;
   }
 
   @Deprecated
   public ZkClient getZkClient() {
-    return (ZkClient) getHelixZkClient();
+    return (ZkClient) getRealmAwareZkClient();
   }
 
   public HelixAdmin getHelixAdmin() {
     if (_zkHelixAdmin == null) {
-      _zkHelixAdmin = new ZKHelixAdmin(getHelixZkClient());
+      synchronized (this) {
+        if (_zkHelixAdmin == null) {
+          _zkHelixAdmin = new ZKHelixAdmin(getRealmAwareZkClient());
+        }
+      }
     }
     return _zkHelixAdmin;
   }
 
   public ClusterSetup getClusterSetup() {
     if (_clusterSetup == null) {
-      _clusterSetup = new ClusterSetup(getHelixZkClient(), getHelixAdmin());
+      synchronized (this) {
+        if (_clusterSetup == null) {
+          _clusterSetup = new ClusterSetup(getRealmAwareZkClient(), getHelixAdmin());
+        }
+      }
     }
     return _clusterSetup;
   }
 
   public TaskDriver getTaskDriver(String clusterName) {
-    synchronized (_taskDriverPool) {
-      if (!_taskDriverPool.containsKey(clusterName)) {
-        _taskDriverPool.put(clusterName, new TaskDriver(getHelixZkClient(), clusterName));
+    TaskDriver taskDriver = _taskDriverPool.get(clusterName);
+    if (taskDriver == null) {
+      synchronized (this) {
+        if (!_taskDriverPool.containsKey(clusterName)) {
+          _taskDriverPool.put(clusterName, new TaskDriver(getRealmAwareZkClient(), clusterName));
+        }
+        taskDriver = _taskDriverPool.get(clusterName);
       }
-      return _taskDriverPool.get(clusterName);
     }
+    return taskDriver;
   }
 
   public ConfigAccessor getConfigAccessor() {
     if (_configAccessor == null) {
-      _configAccessor = new ConfigAccessor(getHelixZkClient());
+      synchronized (this) {
+        if (_configAccessor == null) {
+          _configAccessor = new ConfigAccessor(getRealmAwareZkClient());
+        }
+      }
     }
     return _configAccessor;
   }
 
-  public HelixDataAccessor getDataAccssor(String clusterName) {
-    synchronized (_helixDataAccessorPool) {
-      if (!_helixDataAccessorPool.containsKey(clusterName)) {
-        ZkBaseDataAccessor<ZNRecord> baseDataAccessor =
-            new ZkBaseDataAccessor<>(getHelixZkClient());
-        _helixDataAccessorPool.put(clusterName,
-            new ZKHelixDataAccessor(clusterName, InstanceType.ADMINISTRATOR, baseDataAccessor));
+  public HelixDataAccessor getDataAccessor(String clusterName) {
+    HelixDataAccessor dataAccessor = _helixDataAccessorPool.get(clusterName);
+    if (dataAccessor == null) {
+      synchronized (this) {
+        if (!_helixDataAccessorPool.containsKey(clusterName)) {
+          ZkBaseDataAccessor<ZNRecord> baseDataAccessor =
+              new ZkBaseDataAccessor<>(getRealmAwareZkClient());
+          _helixDataAccessorPool.put(clusterName,
+              new ZKHelixDataAccessor(clusterName, InstanceType.ADMINISTRATOR, baseDataAccessor));
+        }
+        dataAccessor = _helixDataAccessorPool.get(clusterName);
       }
-      return _helixDataAccessorPool.get(clusterName);
     }
+    return dataAccessor;
   }
 
   /**
@@ -132,30 +224,26 @@ public class ServerContext {
    * @return
    */
   public ZkBaseDataAccessor<byte[]> getByteArrayZkBaseDataAccessor() {
-    ZkBaseDataAccessor<byte[]> byteArrayZkBaseDataAccessor = _byteArrayZkBaseDataAccessor;
-    if (byteArrayZkBaseDataAccessor != null) { // First check (no locking)
-      return byteArrayZkBaseDataAccessor;
-    }
+    if (_byteArrayZkBaseDataAccessor == null) {
+      synchronized (this) {
+        if (_byteArrayZkBaseDataAccessor == null) {
 
-    synchronized (this) {
-      if (_byteArrayZkBaseDataAccessor == null) { // Second check (with locking)
-        _byteArrayZkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkAddr, new ZkSerializer() {
-          @Override
-          public byte[] serialize(Object o)
-              throws ZkMarshallingError {
-            // TODO: Support serialize for write methods if necessary
-            throw new UnsupportedOperationException("serialize() is not supported.");
-          }
+          _byteArrayZkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkAddr, new ZkSerializer() {
+            @Override
+            public byte[] serialize(Object o) throws ZkMarshallingError {
+              // TODO: Support serialize for write methods if necessary
+              throw new UnsupportedOperationException("serialize() is not supported.");
+            }
 
-          @Override
-          public Object deserialize(byte[] bytes)
-              throws ZkMarshallingError {
-            return bytes;
-          }
-        });
+            @Override
+            public Object deserialize(byte[] bytes) throws ZkMarshallingError {
+              return bytes;
+            }
+          });
+        }
       }
-      return _byteArrayZkBaseDataAccessor;
     }
+    return _byteArrayZkBaseDataAccessor;
   }
 
   public void close() {
@@ -165,5 +253,96 @@ public class ServerContext {
     if (_zkMetadataStoreDirectory != null) {
       _zkMetadataStoreDirectory.close();
     }
+    if (_zkClientForListener != null) {
+      _zkClientForListener.close();
+    }
+  }
+
+  @Override
+  public void handleChildChange(String parentPath, List<String> currentChilds) {
+    if (_zkClientForListener == null || _zkClientForListener.isClosed()) {
+      return;
+    }
+    // Resubscribe
+    _zkClientForListener.unsubscribeAll();
+    _zkClientForListener.subscribeRoutingDataChanges(this, this);
+    resetZkResources();
+  }
+
+  @Override
+  public void handleDataChange(String dataPath, Object data) {
+    if (_zkClientForListener == null || _zkClientForListener.isClosed()) {
+      return;
+    }
+    resetZkResources();
+  }
+
+  @Override
+  public void handleDataDeleted(String dataPath) {
+    // NOP because this is covered by handleChildChange()
+  }
+
+  @Override
+  public void handleStateChanged(Watcher.Event.KeeperState state) {
+    if (_zkClientForListener == null || _zkClientForListener.isClosed()) {
+      return;
+    }
+    // Resubscribe
+    _zkClientForListener.unsubscribeAll();
+    _zkClientForListener.subscribeRoutingDataChanges(this, this);
+    resetZkResources();
+  }
+
+  @Override
+  public void handleNewSession(String sessionId) {
+    if (_zkClientForListener == null || _zkClientForListener.isClosed()) {
+      return;
+    }
+    // Resubscribe
+    _zkClientForListener.unsubscribeAll();
+    _zkClientForListener.subscribeRoutingDataChanges(this, this);
+    resetZkResources();
+  }
+
+  @Override
+  public void handleSessionEstablishmentError(Throwable error) {
+    if (_zkClientForListener == null || _zkClientForListener.isClosed()) {
+      return;
+    }
+    // Resubscribe
+    _zkClientForListener.unsubscribeAll();
+    _zkClientForListener.subscribeRoutingDataChanges(this, this);
+    resetZkResources();
+  }
+
+  /**
+   * Resets all internally cached routing data by closing and nullifying the ZkClient and Helix APIs.
+   * This is okay because routing data update should be infrequent.
+   */
+  private void resetZkResources() {
+    synchronized (this) {
+      LOG.info("ServerContext: Resetting ZK resources due to routing data change! Routing ZK: {}",
+          _zkAddr);
+      try {
+        // Reset HttpRoutingDataReader's cache
+        HttpRoutingDataReader.reset();
+        // All Helix APIs will be closed implicitly because ZkClient is closed
+        if (_zkClient != null && !_zkClient.isClosed()) {
+          _zkClient.close();
+        }
+        if (_byteArrayZkBaseDataAccessor != null) {
+          _byteArrayZkBaseDataAccessor.close();
+        }
+        _zkClient = null;
+        _zkHelixAdmin = null;
+        _clusterSetup = null;
+        _configAccessor = null;
+        _byteArrayZkBaseDataAccessor = null;
+        _helixDataAccessorPool.clear();
+        _taskDriverPool.clear();
+      } catch (Exception e) {
+        LOG.error("Failed to reset ZkClient and Helix APIs in ServerContext!", e);
+      }
+    }
   }
 }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
index f1bb583..487316b 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
@@ -24,15 +24,15 @@ import java.io.IOException;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.zookeeper.impl.client.ZkClient;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.rest.common.ContextPropertyKeys;
 import org.apache.helix.rest.server.ServerContext;
 import org.apache.helix.rest.server.resources.AbstractResource;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
 
 
 /**
@@ -42,14 +42,14 @@ import org.apache.helix.tools.ClusterSetup;
  */
 public class AbstractHelixResource extends AbstractResource {
 
-  public HelixZkClient getHelixZkClient() {
+  public RealmAwareZkClient getRealmAwareZkClient() {
     ServerContext serverContext = getServerContext();
-    return serverContext.getHelixZkClient();
+    return serverContext.getRealmAwareZkClient();
   }
 
   @Deprecated
   public ZkClient getZkClient() {
-    return (ZkClient) getHelixZkClient();
+    return (ZkClient) getRealmAwareZkClient();
   }
 
   public HelixAdmin getHelixAdmin() {
@@ -74,7 +74,7 @@ public class AbstractHelixResource extends AbstractResource {
 
   public HelixDataAccessor getDataAccssor(String clusterName) {
     ServerContext serverContext = getServerContext();
-    return serverContext.getDataAccssor(clusterName);
+    return serverContext.getDataAccessor(clusterName);
   }
 
   protected ZkBaseDataAccessor<byte[]> getByteArrayDataAccessor() {
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
index 762210a..fe824b1 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
@@ -44,22 +44,22 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.model.RESTConfig;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.manager.zk.ZKUtil;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ControllerHistory;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.MaintenanceSignal;
 import org.apache.helix.model.Message;
+import org.apache.helix.model.RESTConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.rest.server.json.cluster.ClusterTopology;
 import org.apache.helix.rest.server.service.ClusterService;
 import org.apache.helix.rest.server.service.ClusterServiceImpl;
 import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -137,7 +137,6 @@ public class ClusterAccessor extends AbstractHelixResource {
       @DefaultValue("false") @QueryParam("recreate") String recreate) {
     boolean recreateIfExists = Boolean.valueOf(recreate);
     ClusterSetup clusterSetup = getClusterSetup();
-
     try {
       clusterSetup.addCluster(clusterId, recreateIfExists);
     } catch (Exception ex) {
@@ -454,7 +453,7 @@ public class ClusterAccessor extends AbstractHelixResource {
       LOG.error("Failed to deserialize user's input {}. Exception: {}.", content, e);
       return badRequest("Input is not a valid ZNRecord!");
     }
-    HelixZkClient zkClient = getHelixZkClient();
+    RealmAwareZkClient zkClient = getRealmAwareZkClient();
     String path = PropertyPathBuilder.stateModelDef(clusterId);
     try {
       ZKUtil.createChildren(zkClient, path, record);
@@ -638,7 +637,7 @@ public class ClusterAccessor extends AbstractHelixResource {
   }
 
   private boolean doesClusterExist(String cluster) {
-    HelixZkClient zkClient = getHelixZkClient();
+    RealmAwareZkClient zkClient = getRealmAwareZkClient();
     return ZKUtil.isClusterSetup(cluster, zkClient);
   }
 
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
index ca2189e..b8c7c38 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
@@ -41,18 +41,18 @@ import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.codehaus.jackson.type.TypeReference;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.codehaus.jackson.node.ArrayNode;
 import org.codehaus.jackson.node.JsonNodeFactory;
 import org.codehaus.jackson.node.ObjectNode;
+import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,7 +79,7 @@ public class ResourceAccessor extends AbstractHelixResource {
     ObjectNode root = JsonNodeFactory.instance.objectNode();
     root.put(Properties.id.name(), JsonNodeFactory.instance.textNode(clusterId));
 
-    HelixZkClient zkClient = getHelixZkClient();
+    RealmAwareZkClient zkClient = getRealmAwareZkClient();
 
     ArrayNode idealStatesNode = root.putArray(ResourceProperties.idealStates.name());
     ArrayNode externalViewsNode = root.putArray(ResourceProperties.externalViews.name());
@@ -109,7 +109,7 @@ public class ResourceAccessor extends AbstractHelixResource {
   @Path("health")
   public Response getResourceHealth(@PathParam("clusterId") String clusterId) {
 
-    HelixZkClient zkClient = getHelixZkClient();
+    RealmAwareZkClient zkClient = getRealmAwareZkClient();
 
     List<String> resourcesInIdealState =
         zkClient.getChildren(PropertyPathBuilder.idealState(clusterId));
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/integration/TestRoutingDataUpdate.java b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/integration/TestRoutingDataUpdate.java
new file mode 100644
index 0000000..0babf05
--- /dev/null
+++ b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/integration/TestRoutingDataUpdate.java
@@ -0,0 +1,176 @@
+package org.apache.helix.rest.metadatastore.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.rest.server.AbstractTestClass;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * TestRoutingDataUpdate tests that Helix REST server's ServerContext gets a proper update whenever
+ * there is change in the routing data.
+ */
+public class TestRoutingDataUpdate extends AbstractTestClass {
+  private static final String CLUSTER_0_SHARDING_KEY = "/TestRoutingDataUpdate-cluster-0";
+  private static final String CLUSTER_1_SHARDING_KEY = "/TestRoutingDataUpdate-cluster-1";
+  private final Map<String, List<String>> _routingData = new HashMap<>();
+
+  @BeforeClass
+  public void beforeClass() {
+    System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_HOSTNAME_KEY,
+        getBaseUri().getHost());
+    System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_PORT_KEY,
+        Integer.toString(getBaseUri().getPort()));
+
+    // Set the multi-zk config
+    System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "true");
+    // Set the MSDS address
+    String msdsEndpoint = getBaseUri().toString();
+    System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY, msdsEndpoint);
+
+    // Restart Helix Rest server to get a fresh ServerContext created
+    restartRestServer();
+  }
+
+  @AfterClass
+  public void afterClass() {
+    // Clear all property
+    System.clearProperty(MetadataStoreRoutingConstants.MSDS_SERVER_HOSTNAME_KEY);
+    System.clearProperty(MetadataStoreRoutingConstants.MSDS_SERVER_PORT_KEY);
+    System.clearProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+
+    restartRestServer();
+  }
+
+  @Test
+  public void testRoutingDataUpdate() throws Exception {
+    // Set up routing data
+    _routingData.put(ZK_ADDR, Arrays.asList(CLUSTER_0_SHARDING_KEY, CLUSTER_1_SHARDING_KEY));
+    _routingData.put(_zkAddrTestNS, new ArrayList<>());
+    String routingDataString = OBJECT_MAPPER.writeValueAsString(_routingData);
+    put(MetadataStoreRoutingConstants.MSDS_GET_ALL_ROUTING_DATA_ENDPOINT, null,
+        Entity.entity(routingDataString, MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.CREATED.getStatusCode());
+
+    // Need to wait so that ServerContext processes the callback
+    // TODO: Think of another way to wait -
+    // this is only used because of the nature of the testing environment
+    // in production, the server might return a 500 if a http call comes in before callbacks get
+    // processed fully
+    Thread.sleep(500L);
+
+    // Create the first cluster using Helix REST API via ClusterAccessor
+    put("/clusters" + CLUSTER_0_SHARDING_KEY, null,
+        Entity.entity("", MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.CREATED.getStatusCode());
+    // Check that the first cluster is created in the first ZK as designated by routing data
+    Assert.assertTrue(_gZkClient.exists(CLUSTER_0_SHARDING_KEY));
+    Assert.assertFalse(_gZkClientTestNS.exists(CLUSTER_0_SHARDING_KEY));
+
+    // Change the routing data mapping so that CLUSTER_1 points to the second ZK
+    _routingData.clear();
+    _routingData.put(ZK_ADDR, Collections.singletonList(CLUSTER_0_SHARDING_KEY));
+    _routingData.put(_zkAddrTestNS, Collections.singletonList(CLUSTER_1_SHARDING_KEY));
+    routingDataString = OBJECT_MAPPER.writeValueAsString(_routingData);
+    put(MetadataStoreRoutingConstants.MSDS_GET_ALL_ROUTING_DATA_ENDPOINT, null,
+        Entity.entity(routingDataString, MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.CREATED.getStatusCode());
+
+    // Need to wait so that ServerContext processes the callback
+    // TODO: Think of another way to wait -
+    // this is only used because of the nature of the testing environment
+    // in production, the server might return a 500 if a http call comes in before callbacks get
+    // processed fully
+    Thread.sleep(500L);
+
+    // Create the second cluster using Helix REST API via ClusterAccessor
+    put("/clusters" + CLUSTER_1_SHARDING_KEY, null,
+        Entity.entity("", MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.CREATED.getStatusCode());
+    // Check that the second cluster is created in the second ZK as designated by routing data
+    Assert.assertTrue(_gZkClientTestNS.exists(CLUSTER_1_SHARDING_KEY));
+    Assert.assertFalse(_gZkClient.exists(CLUSTER_1_SHARDING_KEY));
+
+    // Remove all routing data
+    put(MetadataStoreRoutingConstants.MSDS_GET_ALL_ROUTING_DATA_ENDPOINT, null, Entity
+        .entity(OBJECT_MAPPER.writeValueAsString(Collections.emptyMap()),
+            MediaType.APPLICATION_JSON_TYPE), Response.Status.CREATED.getStatusCode());
+
+    // Need to wait so that ServerContext processes the callback
+    // TODO: Think of another way to wait -
+    // this is only used because of the nature of the testing environment
+    // in production, the server might return a 500 if a http call comes in before callbacks get
+    // processed fully
+    Thread.sleep(500L);
+
+    // Delete clusters - both should fail because routing data don't have these clusters
+    delete("/clusters" + CLUSTER_0_SHARDING_KEY,
+        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
+    delete("/clusters" + CLUSTER_1_SHARDING_KEY,
+        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
+
+    // Set the routing data again
+    put(MetadataStoreRoutingConstants.MSDS_GET_ALL_ROUTING_DATA_ENDPOINT, null,
+        Entity.entity(routingDataString, MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.CREATED.getStatusCode());
+
+    // Need to wait so that ServerContext processes the callback
+    // TODO: Think of another way to wait -
+    // this is only used because of the nature of the testing environment
+    // in production, the server might return a 500 if a http call comes in before callbacks get
+    // processed fully
+    Thread.sleep(500L);
+
+    // Attempt deletion again - now they should succeed
+    delete("/clusters" + CLUSTER_0_SHARDING_KEY, Response.Status.OK.getStatusCode());
+    delete("/clusters" + CLUSTER_1_SHARDING_KEY, Response.Status.OK.getStatusCode());
+
+    // Double-verify using ZkClients
+    Assert.assertFalse(_gZkClientTestNS.exists(CLUSTER_1_SHARDING_KEY));
+    Assert.assertFalse(_gZkClient.exists(CLUSTER_0_SHARDING_KEY));
+
+    // Remove all routing data
+    put(MetadataStoreRoutingConstants.MSDS_GET_ALL_ROUTING_DATA_ENDPOINT, null, Entity
+        .entity(OBJECT_MAPPER.writeValueAsString(Collections.emptyMap()),
+            MediaType.APPLICATION_JSON_TYPE), Response.Status.CREATED.getStatusCode());
+  }
+
+  private void restartRestServer() {
+    if (_helixRestServer != null) {
+      _helixRestServer.shutdown();
+    }
+    _helixRestServer = startRestServer();
+  }
+}
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index c5ffd41..0fee10d 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -189,21 +189,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
           @Override
           public void start() {
             if (_helixRestServer == null) {
-              // Create namespace manifest map
-              List<HelixRestNamespace> namespaces = new ArrayList<>();
-              // Add test namespace
-              namespaces.add(new HelixRestNamespace(TEST_NAMESPACE,
-                  HelixRestNamespace.HelixMetadataStoreType.ZOOKEEPER, _zkAddrTestNS, false));
-              // Add default namesapce
-              namespaces.add(new HelixRestNamespace(ZK_ADDR));
-              try {
-                _helixRestServer =
-                    new HelixRestServer(namespaces, baseUri.getPort(), baseUri.getPath(),
-                        Collections.singletonList(_auditLogger));
-                _helixRestServer.start();
-              } catch (Exception ex) {
-                throw new TestContainerException(ex);
-              }
+              _helixRestServer = startRestServer();
             }
           }
 
@@ -584,4 +570,28 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
     _clusters.add(STOPPABLE_CLUSTER);
     _workflowMap.put(STOPPABLE_CLUSTER, createWorkflows(STOPPABLE_CLUSTER, 3));
   }
+
+  /**
+   * Starts a HelixRestServer for the test suite.
+   * @return
+   */
+  protected HelixRestServer startRestServer() {
+    // Create namespace manifest map
+    List<HelixRestNamespace> namespaces = new ArrayList<>();
+    // Add test namespace
+    namespaces.add(new HelixRestNamespace(TEST_NAMESPACE,
+        HelixRestNamespace.HelixMetadataStoreType.ZOOKEEPER, _zkAddrTestNS, false));
+    // Add default namesapce
+    namespaces.add(new HelixRestNamespace(ZK_ADDR));
+    HelixRestServer server;
+    try {
+      server =
+          new HelixRestServer(namespaces, getBaseUri().getPort(), getBaseUri().getPath(),
+              Collections.singletonList(_auditLogger));
+      server.start();
+    } catch (Exception ex) {
+      throw new TestContainerException(ex);
+    }
+    return server;
+  }
 }
diff --git a/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/constant/MetadataStoreRoutingConstants.java b/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/constant/MetadataStoreRoutingConstants.java
index 41d5011..6cceb50 100644
--- a/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/constant/MetadataStoreRoutingConstants.java
+++ b/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/constant/MetadataStoreRoutingConstants.java
@@ -22,6 +22,8 @@ package org.apache.helix.msdcommon.constant;
 public class MetadataStoreRoutingConstants {
   public static final String ROUTING_DATA_PATH = "/METADATA_STORE_ROUTING_DATA";
 
+  public static final String ROUTING_ZK_ADDRESS_KEY = "ROUTING_ZK_ADDRESS";
+
   // For ZK only
   public static final String ZNRECORD_LIST_FIELD_KEY = "ZK_PATH_SHARDING_KEYS";
 
diff --git a/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/util/ZkValidationUtil.java b/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/util/ZkValidationUtil.java
index 472b3d9..ab8258d 100644
--- a/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/util/ZkValidationUtil.java
+++ b/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/util/ZkValidationUtil.java
@@ -27,12 +27,13 @@ public class ZkValidationUtil {
    * /
    * /abc
    * /abc/abc/abc/abc
+   * /abc/localhost:1234
    * Invalid matches:
    * null or empty string
    * /abc/
    * /abc/abc/abc/abc/
    **/
   public static boolean isPathValid(String path) {
-    return path.matches("^/|(/[\\w-]+)+$");
+    return path.matches("^/|(/[\\w?:-]+)+$");
   }
 }
diff --git a/pom.xml b/pom.xml
index e8c9bd3..60f8d87 100644
--- a/pom.xml
+++ b/pom.xml
@@ -630,40 +630,6 @@ under the License.
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-surefire-plugin</artifactId>
           <version>3.0.0-M3</version>
-          <executions>
-            <!--
-            "executions" enables multiple runs of integration test suites. This is to enable two
-            runs: 1. run in a single-ZK environment, 2. run in a multi-ZK environment.
-            "multiZk" is the config accessible via Systems.Properties so that the two runs could be
-            differentiated.
-            -->
-            <execution>
-              <goals>
-                <goal>test</goal>
-              </goals>
-              <id>default-test</id>
-              <phase>test</phase>
-              <configuration>
-                <rerunFailingTestsCount>3</rerunFailingTestsCount>
-                <skipAfterFailureCount>10</skipAfterFailureCount>
-              </configuration>
-            </execution>
-            <execution>
-              <goals>
-                <goal>test</goal>
-              </goals>
-              <id>multi-zk</id>
-              <phase>test</phase>
-              <configuration>
-                <systemPropertyVariables>
-                  <multiZk>true</multiZk>
-                  <numZk>3</numZk>
-                </systemPropertyVariables>
-                <rerunFailingTestsCount>3</rerunFailingTestsCount>
-                <skipAfterFailureCount>10</skipAfterFailureCount>
-              </configuration>
-            </execution>
-          </executions>
         </plugin>
         <plugin>
           <groupId>org.apache.rat</groupId>
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
index 0e461b7..ee8c8e3 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
 import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.zookeeper.exception.ZkClientException;
 import org.apache.helix.zookeeper.util.HttpRoutingDataReader;
@@ -579,4 +580,19 @@ public interface RealmAwareZkClient {
           .setConnectInitTimeout(_connectInitTimeout);
     }
   }
+
+  /**
+   * Subscribes to the routing data paths using the provided ZkClient.
+   * Note: this method assumes that the routing data path has already been created.
+   * @param childListener
+   * @param dataListener
+   */
+  default void subscribeRoutingDataChanges(IZkChildListener childListener,
+      IZkDataListener dataListener) {
+    subscribeChildChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, childListener);
+    for (String child : getChildren(MetadataStoreRoutingConstants.ROUTING_DATA_PATH)) {
+      subscribeDataChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + child,
+          dataListener);
+    }
+  }
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/util/HttpRoutingDataReader.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/util/HttpRoutingDataReader.java
index f2f907a..b214cc4 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/util/HttpRoutingDataReader.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/util/HttpRoutingDataReader.java
@@ -87,7 +87,7 @@ public class HttpRoutingDataReader {
       synchronized (HttpRoutingDataReader.class) {
         rawRoutingData = _rawRoutingDataMap.get(msdsEndpoint);
         if (rawRoutingData == null) {
-          String routingDataJson = getAllRoutingData();
+          String routingDataJson = getAllRoutingData(msdsEndpoint);
           // Update the reference if reading routingData over HTTP is successful
           rawRoutingData = parseRoutingData(routingDataJson);
           _rawRoutingDataMap.put(msdsEndpoint, rawRoutingData);
@@ -136,16 +136,24 @@ public class HttpRoutingDataReader {
   }
 
   /**
+   * Clears the statically-cached routing data in HttpRoutingDataReader.
+   */
+  public static void reset() {
+    _rawRoutingDataMap.clear();
+    _metadataStoreRoutingDataMap.clear();
+  }
+
+  /**
    * Makes an HTTP call to fetch all routing data.
    * @return
    * @throws IOException
    */
-  private static String getAllRoutingData() throws IOException {
+  private static String getAllRoutingData(String msdsEndpoint) throws IOException {
     // Note that MSDS_ENDPOINT should provide high-availability - it risks becoming a single point
     // of failure if it's backed by a single IP address/host
     // Retry count is 3 by default.
     HttpGet requestAllData = new HttpGet(
-        SYSTEM_MSDS_ENDPOINT + MetadataStoreRoutingConstants.MSDS_GET_ALL_ROUTING_DATA_ENDPOINT);
+        msdsEndpoint + MetadataStoreRoutingConstants.MSDS_GET_ALL_ROUTING_DATA_ENDPOINT);
 
     // Define timeout configs
     RequestConfig config = RequestConfig.custom().setConnectTimeout(HTTP_TIMEOUT_IN_MS)
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java
index 900c79f..acb2299 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java
@@ -43,7 +43,7 @@ public abstract class RealmAwareZkClientTestBase extends ZkTestBase {
 
   @BeforeClass
   public void beforeClass() throws IOException, InvalidRoutingDataException {
-    // Create a mock MSDS so that HttpRoudingDataReader could fetch the routing data
+    // Create a mock MSDS so that HttpRoutingDataReader could fetch the routing data
     if (_msdsServer == null) {
       // Do not create again if Mock MSDS server has already been created by other tests
       _msdsServer = new MockMetadataStoreDirectoryServer(MSDS_HOSTNAME, MSDS_PORT, MSDS_NAMESPACE,