You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/08/14 18:14:02 UTC

[helix] 07/12: Modify realm-aware ZkClient and Helix API for configurable routing source

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 6014df6ddc89cb62163b8448cc408075b1691dc4
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Fri Jul 24 15:01:04 2020 -0700

    Modify realm-aware ZkClient and Helix API for configurable routing source
    
    This commit changes old MSDS-based interfaces and replaces them with a more generic configurable routing data source interfaces. This commit also adds test cases for Helix API.
---
 .../main/java/org/apache/helix/ConfigAccessor.java |  7 +-
 .../manager/zk/GenericBaseDataAccessorBuilder.java |  3 +-
 .../helix/manager/zk/GenericZkHelixApiBuilder.java | 21 +++---
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  | 14 ++--
 .../java/org/apache/helix/manager/zk/ZKUtil.java   |  3 +-
 .../helix/manager/zk/ZkBaseDataAccessor.java       |  7 +-
 .../helix/manager/zk/ZkBucketDataAccessor.java     |  2 +-
 .../java/org/apache/helix/tools/ClusterSetup.java  |  8 +-
 .../multizk/TestMultiZkHelixJavaApis.java          | 88 ++++++++++++++++++++--
 .../apache/helix/rest/server/ServerContext.java    |  5 +-
 .../zookeeper/api/client/RealmAwareZkClient.java   | 44 ++++++++---
 .../zookeeper/constant/RoutingDataReaderType.java  | 14 ++++
 .../zookeeper/impl/client/DedicatedZkClient.java   | 18 ++---
 .../zookeeper/impl/client/FederatedZkClient.java   | 17 ++---
 .../zookeeper/impl/client/SharedZkClient.java      | 17 ++---
 15 files changed, 183 insertions(+), 85 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
index 48bcfd4..4885f31 100644
--- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
@@ -19,7 +19,6 @@ package org.apache.helix;
  * under the License.
  */
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -30,8 +29,8 @@ import java.util.TreeMap;
 
 import org.apache.helix.manager.zk.GenericZkHelixApiBuilder;
 import org.apache.helix.manager.zk.ZKUtil;
-import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CloudConfig;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ConfigScope;
 import org.apache.helix.model.CustomizedStateConfig;
 import org.apache.helix.model.HelixConfigScope;
@@ -40,8 +39,8 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.RESTConfig;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.util.HelixUtil;
 import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
+import org.apache.helix.util.HelixUtil;
 import org.apache.helix.util.StringTemplate;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
@@ -120,7 +119,7 @@ public class ConfigAccessor {
             new RealmAwareZkClient.RealmAwareZkClientConfig()
                 .setZkSerializer(new ZNRecordSerializer()));
         return;
-      } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+      } catch (InvalidRoutingDataException | IllegalStateException e) {
         throw new HelixException("Failed to create ConfigAccessor!", e);
       }
     }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/GenericBaseDataAccessorBuilder.java b/helix-core/src/main/java/org/apache/helix/manager/zk/GenericBaseDataAccessorBuilder.java
index 054e693..a75c8cf 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/GenericBaseDataAccessorBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/GenericBaseDataAccessorBuilder.java
@@ -19,7 +19,6 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
-import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.helix.HelixException;
@@ -83,7 +82,7 @@ public class GenericBaseDataAccessorBuilder<B extends GenericBaseDataAccessorBui
       case MULTI_REALM:
         try {
           zkClient = new FederatedZkClient(connectionConfig, clientConfig);
-        } catch (IOException | InvalidRoutingDataException e) {
+        } catch (InvalidRoutingDataException e) {
           throw new HelixException("Not able to connect on multi-realm mode.", e);
         }
         break;
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/GenericZkHelixApiBuilder.java b/helix-core/src/main/java/org/apache/helix/manager/zk/GenericZkHelixApiBuilder.java
index 840ec8f..d02f67e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/GenericZkHelixApiBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/GenericZkHelixApiBuilder.java
@@ -19,8 +19,6 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
-import java.io.IOException;
-
 import org.apache.helix.HelixException;
 import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
 import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
@@ -109,7 +107,7 @@ public abstract class GenericZkHelixApiBuilder<B extends GenericZkHelixApiBuilde
         try {
           _zkAddress = resolveZkAddressWithShardingKey(_realmAwareZkConnectionConfig);
           isZkAddressSet = true;
-        } catch (IOException | InvalidRoutingDataException e) {
+        } catch (InvalidRoutingDataException e) {
           LOG.warn(
               "GenericZkHelixApiBuilder: ZkAddress is not set and failed to resolve ZkAddress with ZK path sharding key!",
               e);
@@ -166,7 +164,7 @@ public abstract class GenericZkHelixApiBuilder<B extends GenericZkHelixApiBuilde
         try {
           return new FederatedZkClient(connectionConfig,
               clientConfig.setZkSerializer(new ZNRecordSerializer()));
-        } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+        } catch (InvalidRoutingDataException | IllegalStateException e) {
           throw new HelixException("GenericZkHelixApiBuilder: Failed to create FederatedZkClient!",
               e);
         }
@@ -197,17 +195,18 @@ public abstract class GenericZkHelixApiBuilder<B extends GenericZkHelixApiBuilde
    * ZK address is not given in this Builder.
    * @param connectionConfig
    * @return
-   * @throws IOException
    * @throws InvalidRoutingDataException
    */
   private String resolveZkAddressWithShardingKey(
       RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig)
-      throws IOException, InvalidRoutingDataException {
-    boolean isMsdsEndpointSet =
-        connectionConfig.getMsdsEndpoint() != null && !connectionConfig.getMsdsEndpoint().isEmpty();
-    // TODO: Make RoutingDataReaderType configurable
-    MetadataStoreRoutingData routingData = isMsdsEndpointSet ? RoutingDataManager
-        .getMetadataStoreRoutingData(RoutingDataReaderType.HTTP, connectionConfig.getMsdsEndpoint())
+      throws InvalidRoutingDataException {
+    boolean isRoutingDataSourceEndpointSet =
+        connectionConfig.getRoutingDataSourceEndpoint() != null && !connectionConfig
+            .getRoutingDataSourceEndpoint().isEmpty();
+    MetadataStoreRoutingData routingData = isRoutingDataSourceEndpointSet ? RoutingDataManager
+        .getMetadataStoreRoutingData(
+            RoutingDataReaderType.lookUp(connectionConfig.getRoutingDataSourceType()),
+            connectionConfig.getRoutingDataSourceEndpoint())
         : RoutingDataManager.getMetadataStoreRoutingData();
     return routingData.getMetadataStoreRealm(connectionConfig.getZkRealmShardingKey());
   }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 348f8d8..1759116 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -151,7 +151,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       try {
         zkClient = new FederatedZkClient(
             new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(), clientConfig);
-      } catch (IllegalStateException | IOException | InvalidRoutingDataException e) {
+      } catch (IllegalStateException | InvalidRoutingDataException e) {
         throw new HelixException("Not able to connect on multi-realm mode.", e);
       }
     } else {
@@ -965,13 +965,15 @@ public class ZKHelixAdmin implements HelixAdmin {
         || _zkClient instanceof FederatedZkClient) {
       // If on multi-zk mode, we retrieve cluster information from Metadata Store Directory Service.
       Map<String, List<String>> realmToShardingKeys;
-      String msdsEndpoint = _zkClient.getRealmAwareZkConnectionConfig().getMsdsEndpoint();
-      if (msdsEndpoint == null || msdsEndpoint.isEmpty()) {
+      String routingDataSourceEndpoint =
+          _zkClient.getRealmAwareZkConnectionConfig().getRoutingDataSourceEndpoint();
+      if (routingDataSourceEndpoint == null || routingDataSourceEndpoint.isEmpty()) {
+        // If endpoint is not given explicitly, use HTTP and the endpoint set in System Properties
         realmToShardingKeys = RoutingDataManager.getRawRoutingData();
       } else {
-        // TODO: Make RoutingDataReaderType configurable
-        realmToShardingKeys =
-            RoutingDataManager.getRawRoutingData(RoutingDataReaderType.HTTP, msdsEndpoint);
+        realmToShardingKeys = RoutingDataManager.getRawRoutingData(RoutingDataReaderType
+                .lookUp(_zkClient.getRealmAwareZkConnectionConfig().getRoutingDataSourceType()),
+            routingDataSourceEndpoint);
       }
 
       if (realmToShardingKeys == null || realmToShardingKeys.isEmpty()) {
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
index cda5f39..1015834 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
@@ -19,7 +19,6 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -622,7 +621,7 @@ public final class ZKUtil {
         RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
             new RealmAwareZkClient.RealmAwareZkClientConfig();
         return new FederatedZkClient(connectionConfig, clientConfig);
-      } catch (IllegalArgumentException | IOException | InvalidRoutingDataException e) {
+      } catch (IllegalArgumentException | InvalidRoutingDataException e) {
         throw new HelixException("Not able to connect on realm-aware mode", e);
       }
     }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index d2096b0..4e6a7b0 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -19,7 +19,6 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -40,6 +39,7 @@ import org.apache.helix.util.HelixUtil;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.helix.zookeeper.exception.ZkClientException;
 import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
 import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
@@ -52,9 +52,8 @@ import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
-import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
-import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.data.Stat;
@@ -1342,7 +1341,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
       try {
         return new FederatedZkClient(
             new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(), clientConfig);
-      } catch (IllegalStateException | IOException | InvalidRoutingDataException e) {
+      } catch (IllegalStateException | InvalidRoutingDataException e) {
         throw new HelixException("Not able to connect on multi-realm mode.", e);
       }
     }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
index 8f0aa16..efdde81 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -94,7 +94,7 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
         RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
             new RealmAwareZkClient.RealmAwareZkClientConfig();
         _zkClient = new FederatedZkClient(connectionConfig, clientConfig);
-      } catch (IllegalArgumentException | IOException | InvalidRoutingDataException e) {
+      } catch (IllegalArgumentException | InvalidRoutingDataException e) {
         throw new HelixException("Not able to connect on realm-aware mode", e);
       }
     } else {
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 59180eb..667cab7 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -40,12 +40,10 @@ import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixException;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.helix.cloud.azure.AzureConstants;
-import org.apache.helix.cloud.constants.CloudProvider;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.cloud.azure.AzureConstants;
+import org.apache.helix.cloud.constants.CloudProvider;
 import org.apache.helix.manager.zk.GenericZkHelixApiBuilder;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -162,7 +160,7 @@ public class ClusterSetup {
             new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
             new RealmAwareZkClient.RealmAwareZkClientConfig()
                 .setZkSerializer(new ZNRecordSerializer()));
-      } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+      } catch (InvalidRoutingDataException | IllegalStateException e) {
         throw new HelixException("Failed to create ConfigAccessor!", e);
       }
     } else {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
index 5146ce6..d72318a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
@@ -20,6 +20,7 @@ package org.apache.helix.integration.multizk;
  */
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -73,11 +74,14 @@ import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.api.client.ZkClientType;
+import org.apache.helix.zookeeper.constant.RoutingDataReaderType;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.exception.MultiZkException;
 import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
 import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
 import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
+import org.apache.helix.zookeeper.routing.RoutingDataManager;
 import org.apache.helix.zookeeper.zkclient.ZkServer;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -109,16 +113,17 @@ public class TestMultiZkHelixJavaApis {
   private HelixAdmin _zkHelixAdmin;
 
   // Save System property configs from before this test and pass onto after the test
-  private Map<String, String> _configStore = new HashMap<>();
+  private final Map<String, String> _configStore = new HashMap<>();
+
+  private static final String ZK_PREFIX = "localhost:";
+  private static final int ZK_START_PORT = 8777;
+  private String _msdsEndpoint;
 
   @BeforeClass
   public void beforeClass() throws Exception {
     // Create 3 in-memory zookeepers and routing mapping
-    final String zkPrefix = "localhost:";
-    final int zkStartPort = 8777;
-
     for (int i = 0; i < NUM_ZK; i++) {
-      String zkAddress = zkPrefix + (zkStartPort + i);
+      String zkAddress = ZK_PREFIX + (ZK_START_PORT + i);
       ZK_SERVER_MAP.put(zkAddress, TestHelper.startZkServer(zkAddress));
       ZK_CLIENT_MAP.put(zkAddress, DedicatedZkClientFactory.getInstance()
           .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
@@ -132,6 +137,8 @@ public class TestMultiZkHelixJavaApis {
     final String msdsHostName = "localhost";
     final int msdsPort = 11117;
     final String msdsNamespace = "multiZkTest";
+    _msdsEndpoint =
+        "http://" + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" + msdsNamespace;
     _msds = new MockMetadataStoreDirectoryServer(msdsHostName, msdsPort, msdsNamespace,
         _rawRoutingData);
     _msds.startServer();
@@ -151,8 +158,7 @@ public class TestMultiZkHelixJavaApis {
     // Turn on multiZk mode in System config
     System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "true");
     // MSDS endpoint: http://localhost:11117/admin/v2/namespaces/multiZkTest
-    System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY,
-        "http://" + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" + msdsNamespace);
+    System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY, _msdsEndpoint);
 
     // Create a FederatedZkClient for admin work
     _zkClient =
@@ -782,7 +788,8 @@ public class TestMultiZkHelixJavaApis {
         new MockMetadataStoreDirectoryServer("localhost", 11118, "multiZkTest", secondRoutingData);
     final RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
         new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
-            .setMsdsEndpoint(secondMsds.getEndpoint()).build();
+            .setRoutingDataSourceType(RoutingDataReaderType.HTTP.name())
+            .setRoutingDataSourceEndpoint(secondMsds.getEndpoint()).build();
     secondMsds.startServer();
 
     try {
@@ -996,4 +1003,69 @@ public class TestMultiZkHelixJavaApis {
     }
     return sb.toString();
   }
+
+  /**
+   * Testing using ZK as the routing data source. We use BaseDataAccessor as the representative
+   * Helix API.
+   * Two modes are tested: ZK and HTTP-ZK fallback
+   */
+  @Test(dependsOnMethods = "testDifferentMsdsEndpointConfigs")
+  public void testZkRoutingDataSourceConfigs() {
+    // Set up routing data in ZK by connecting directly to ZK
+    BaseDataAccessor<ZNRecord> accessor =
+        new ZkBaseDataAccessor.Builder<ZNRecord>().setZkAddress(ZK_PREFIX + ZK_START_PORT).build();
+
+    // Create ZK realm routing data ZNRecord
+    _rawRoutingData.forEach((realm, keys) -> {
+      ZNRecord znRecord = new ZNRecord(realm);
+      znRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY,
+          new ArrayList<>(keys));
+      accessor.set(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm, znRecord,
+          AccessOption.PERSISTENT);
+    });
+
+    // Create connection configs with the source type set to each type
+    final RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
+        new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
+    final RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfigZk =
+        connectionConfigBuilder.setRoutingDataSourceType(RoutingDataReaderType.ZK.name())
+            .setRoutingDataSourceEndpoint(ZK_PREFIX + ZK_START_PORT).build();
+    final RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfigHttpZkFallback =
+        connectionConfigBuilder
+            .setRoutingDataSourceType(RoutingDataReaderType.HTTP_ZK_FALLBACK.name())
+            .setRoutingDataSourceEndpoint(_msdsEndpoint + "," + ZK_PREFIX + ZK_START_PORT).build();
+    final RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfigHttp =
+        connectionConfigBuilder.setRoutingDataSourceType(RoutingDataReaderType.HTTP.name())
+            .setRoutingDataSourceEndpoint(_msdsEndpoint).build();
+
+    // Reset cached routing data
+    RoutingDataManager.reset();
+    // Shutdown MSDS to ensure that these accessors are able to pull routing data from ZK
+    _msds.stopServer();
+
+    // Create a BaseDataAccessor instance with the connection config
+    BaseDataAccessor<ZNRecord> zkBasedAccessor = new ZkBaseDataAccessor.Builder<ZNRecord>()
+        .setRealmAwareZkConnectionConfig(connectionConfigZk).build();
+    BaseDataAccessor<ZNRecord> httpZkFallbackBasedAccessor =
+        new ZkBaseDataAccessor.Builder<ZNRecord>()
+            .setRealmAwareZkConnectionConfig(connectionConfigHttpZkFallback).build();
+    try {
+      BaseDataAccessor<ZNRecord> httpBasedAccessor = new ZkBaseDataAccessor.Builder<ZNRecord>()
+          .setRealmAwareZkConnectionConfig(connectionConfigHttp).build();
+      Assert.fail("Must fail with a MultiZkException because HTTP connection will be refused.");
+    } catch (MultiZkException e) {
+      // Okay
+    }
+
+    // Check that all clusters appear as existing to this accessor
+    CLUSTER_LIST.forEach(cluster -> {
+      Assert.assertTrue(zkBasedAccessor.exists("/" + cluster, AccessOption.PERSISTENT));
+      Assert.assertTrue(httpZkFallbackBasedAccessor.exists("/" + cluster, AccessOption.PERSISTENT));
+    });
+
+    // Close all connections
+    accessor.close();
+    zkBasedAccessor.close();
+    httpZkFallbackBasedAccessor.close();
+  }
 }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
index f5d8915..176180f 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
@@ -20,7 +20,6 @@ package org.apache.helix.rest.server;
  * under the License.
  */
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -134,13 +133,13 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat
                   new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
               // If MSDS endpoint is set for this namespace, use that instead.
               if (_msdsEndpoint != null && !_msdsEndpoint.isEmpty()) {
-                connectionConfigBuilder.setMsdsEndpoint(_msdsEndpoint);
+                connectionConfigBuilder.setRoutingDataSourceEndpoint(_msdsEndpoint);
               }
               _zkClient = new FederatedZkClient(connectionConfigBuilder.build(),
                   new RealmAwareZkClient.RealmAwareZkClientConfig()
                       .setZkSerializer(new ZNRecordSerializer()));
               LOG.info("ServerContext: FederatedZkClient created successfully!");
-            } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+            } catch (InvalidRoutingDataException | IllegalStateException e) {
               throw new HelixException("Failed to create FederatedZkClient!", e);
             }
           } else {
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
index b1ecc38..e8c91f1 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.zookeeper.constant.RoutingDataReaderType;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
 import org.apache.helix.zookeeper.zkclient.IZkChildListener;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
@@ -56,8 +57,7 @@ public interface RealmAwareZkClient {
    * MULTI_REALM: CRUD and change subscription are supported. Operations involving EPHEMERAL CreateMode will throw an UnsupportedOperationException.
    */
   enum RealmMode {
-    SINGLE_REALM,
-    MULTI_REALM
+    SINGLE_REALM, MULTI_REALM
   }
 
   int DEFAULT_OPERATION_TIMEOUT = Integer.MAX_VALUE;
@@ -65,6 +65,7 @@ public interface RealmAwareZkClient {
   int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
 
   // listener subscription
+
   /**
    * Subscribe the path and the listener will handle child events of the path.
    * Add exists watch to path if the path does not exist in ZooKeeper server.
@@ -86,7 +87,8 @@ public interface RealmAwareZkClient {
    * @return ChildrentSubsribeResult. If the path does not exists, the isInstalled field
    * is false. Otherwise, it is true and list of children are returned.
    */
-  ChildrenSubscribeResult subscribeChildChanges(String path, IZkChildListener listener, boolean skipWatchingNonExistNode);
+  ChildrenSubscribeResult subscribeChildChanges(String path, IZkChildListener listener,
+      boolean skipWatchingNonExistNode);
 
   void unsubscribeChildChanges(String path, IZkChildListener listener);
 
@@ -108,7 +110,8 @@ public interface RealmAwareZkClient {
    * @param skipWatchingNonExistNode True means not installing any watch if path does not exist.
    * return True if installation of watch succeed. Otherwise, return false.
    */
-  boolean subscribeDataChanges(String path, IZkDataListener listener, boolean skipWatchingNonExistNode);
+  boolean subscribeDataChanges(String path, IZkDataListener listener,
+      boolean skipWatchingNonExistNode);
 
   void unsubscribeDataChanges(String path, IZkDataListener listener);
 
@@ -378,12 +381,14 @@ public interface RealmAwareZkClient {
      * NOTE: this field will be ignored if RealmMode is MULTI_REALM!
      */
     private String _zkRealmShardingKey;
-    private String _msdsEndpoint;
+    private String _routingDataSourceType;
+    private String _routingDataSourceEndpoint;
     private int _sessionTimeout = DEFAULT_SESSION_TIMEOUT;
 
     private RealmAwareZkConnectionConfig(Builder builder) {
       _zkRealmShardingKey = builder._zkRealmShardingKey;
-      _msdsEndpoint = builder._msdsEndpoint;
+      _routingDataSourceType = builder._routingDataSourceType;
+      _routingDataSourceEndpoint = builder._routingDataSourceEndpoint;
       _sessionTimeout = builder._sessionTimeout;
     }
 
@@ -424,14 +429,19 @@ public interface RealmAwareZkClient {
       return _sessionTimeout;
     }
 
-    public String getMsdsEndpoint() {
-      return _msdsEndpoint;
+    public String getRoutingDataSourceType() {
+      return _routingDataSourceType;
+    }
+
+    public String getRoutingDataSourceEndpoint() {
+      return _routingDataSourceEndpoint;
     }
 
     public static class Builder {
       private RealmMode _realmMode;
       private String _zkRealmShardingKey;
-      private String _msdsEndpoint;
+      private String _routingDataSourceType;
+      private String _routingDataSourceEndpoint;
       private int _sessionTimeout = DEFAULT_SESSION_TIMEOUT;
 
       public Builder() {
@@ -453,8 +463,13 @@ public interface RealmAwareZkClient {
         return this;
       }
 
-      public Builder setMsdsEndpoint(String msdsEndpoint) {
-        _msdsEndpoint = msdsEndpoint;
+      public Builder setRoutingDataSourceType(String routingDataSourceType) {
+        _routingDataSourceType = routingDataSourceType;
+        return this;
+      }
+
+      public Builder setRoutingDataSourceEndpoint(String routingDataSourceEndpoint) {
+        _routingDataSourceEndpoint = routingDataSourceEndpoint;
         return this;
       }
 
@@ -482,6 +497,13 @@ public interface RealmAwareZkClient {
           throw new IllegalArgumentException(
               "RealmAwareZkConnectionConfig.Builder: ZK sharding key must be set on single-realm mode!");
         }
+        if ((_routingDataSourceEndpoint == null && _routingDataSourceType != null) || (
+            _routingDataSourceEndpoint != null && _routingDataSourceType == null)) {
+          // For routing data source type and endpoint, if one is set and not the other, it is invalid
+          throw new IllegalArgumentException(
+              "RealmAwareZkConnectionConfig.Builder: routing data source type and endpoint are not configured properly! Type: "
+                  + _routingDataSourceType + " Endpoint: " + _routingDataSourceEndpoint);
+        }
       }
     }
   }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingDataReaderType.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingDataReaderType.java
index aedef36..33b6c70 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingDataReaderType.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingDataReaderType.java
@@ -19,6 +19,10 @@ package org.apache.helix.zookeeper.constant;
  * under the License.
  */
 
+import org.apache.commons.lang3.EnumUtils;
+import org.apache.helix.zookeeper.exception.MultiZkException;
+
+
 /**
  * RoutingDataReaderType is an enum that designates the reader type and the class name that can be
  * used to create an instance of RoutingDataReader by reflection.
@@ -37,4 +41,14 @@ public enum RoutingDataReaderType {
   public String getClassName() {
     return this.className;
   }
+
+  public static RoutingDataReaderType lookUp(String enumString) {
+    RoutingDataReaderType type =
+        EnumUtils.getEnumIgnoreCase(RoutingDataReaderType.class, enumString);
+    if (type == null) {
+      throw new MultiZkException(
+          "RoutingDataReaderType::lookUp: Unable to find the enum! String given: " + enumString);
+    }
+    return type;
+  }
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
index 501670c..dbe64f3 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
@@ -19,7 +19,6 @@ package org.apache.helix.zookeeper.impl.client;
  * under the License.
  */
 
-import java.io.IOException;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.TimeUnit;
@@ -69,12 +68,10 @@ public class DedicatedZkClient implements RealmAwareZkClient {
    * such as CRUD, change callback, and ephemeral operations for a single ZkRealmShardingKey.
    * @param connectionConfig
    * @param clientConfig
-   * @throws IOException
    * @throws InvalidRoutingDataException
    */
   public DedicatedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
-      RealmAwareZkClient.RealmAwareZkClientConfig clientConfig)
-      throws IOException, InvalidRoutingDataException {
+      RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) throws InvalidRoutingDataException {
     if (connectionConfig == null) {
       throw new IllegalArgumentException("RealmAwareZkConnectionConfig cannot be null!");
     }
@@ -84,14 +81,15 @@ public class DedicatedZkClient implements RealmAwareZkClient {
     _connectionConfig = connectionConfig;
     _clientConfig = clientConfig;
 
-    // Get the routing data from a static Singleton HttpRoutingDataReader
-    String msdsEndpoint = connectionConfig.getMsdsEndpoint();
-    if (msdsEndpoint == null || msdsEndpoint.isEmpty()) {
+    // Get MetadataStoreRoutingData
+    String routingDataSourceEndpoint = connectionConfig.getRoutingDataSourceEndpoint();
+    if (routingDataSourceEndpoint == null || routingDataSourceEndpoint.isEmpty()) {
+      // If endpoint is not given explicitly, use HTTP and the endpoint set in System Properties
       _metadataStoreRoutingData = RoutingDataManager.getMetadataStoreRoutingData();
     } else {
-      // TODO: Make RoutingDataReaderType configurable
-      _metadataStoreRoutingData =
-          RoutingDataManager.getMetadataStoreRoutingData(RoutingDataReaderType.HTTP, msdsEndpoint);
+      _metadataStoreRoutingData = RoutingDataManager.getMetadataStoreRoutingData(
+          RoutingDataReaderType.lookUp(connectionConfig.getRoutingDataSourceType()),
+          routingDataSourceEndpoint);
     }
 
     _zkRealmShardingKey = connectionConfig.getZkRealmShardingKey();
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
index 22b9fe5..01e7fc7 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
@@ -19,7 +19,6 @@ package org.apache.helix.zookeeper.impl.client;
  * under the License.
  */
 
-import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -87,8 +86,7 @@ public class FederatedZkClient implements RealmAwareZkClient {
 
   // TODO: support capacity of ZkClient number in one FederatedZkClient and do garbage collection.
   public FederatedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
-      RealmAwareZkClient.RealmAwareZkClientConfig clientConfig)
-      throws IOException, InvalidRoutingDataException {
+      RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) throws InvalidRoutingDataException {
     if (connectionConfig == null) {
       throw new IllegalArgumentException("RealmAwareZkConnectionConfig cannot be null!");
     }
@@ -96,14 +94,15 @@ public class FederatedZkClient implements RealmAwareZkClient {
       throw new IllegalArgumentException("RealmAwareZkClientConfig cannot be null!");
     }
 
-    // Attempt to get MetadataStoreRoutingData
-    String msdsEndpoint = connectionConfig.getMsdsEndpoint();
-    if (msdsEndpoint == null || msdsEndpoint.isEmpty()) {
+    // Get MetadataStoreRoutingData
+    String routingDataSourceEndpoint = connectionConfig.getRoutingDataSourceEndpoint();
+    if (routingDataSourceEndpoint == null || routingDataSourceEndpoint.isEmpty()) {
+      // If endpoint is not given explicitly, use HTTP and the endpoint set in System Properties
       _metadataStoreRoutingData = RoutingDataManager.getMetadataStoreRoutingData();
     } else {
-      // TODO: Make RoutingDataReaderType configurable
-      _metadataStoreRoutingData =
-          RoutingDataManager.getMetadataStoreRoutingData(RoutingDataReaderType.HTTP, msdsEndpoint);
+      _metadataStoreRoutingData = RoutingDataManager.getMetadataStoreRoutingData(
+          RoutingDataReaderType.lookUp(connectionConfig.getRoutingDataSourceType()),
+          routingDataSourceEndpoint);
     }
 
     _isClosed = false;
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
index 341731e..7c3a562 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
@@ -19,7 +19,6 @@ package org.apache.helix.zookeeper.impl.client;
  * under the License.
  */
 
-import java.io.IOException;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.TimeUnit;
@@ -65,8 +64,7 @@ public class SharedZkClient implements RealmAwareZkClient {
   private final RealmAwareZkClient.RealmAwareZkClientConfig _clientConfig;
 
   public SharedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
-      RealmAwareZkClient.RealmAwareZkClientConfig clientConfig)
-      throws IOException, InvalidRoutingDataException {
+      RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) throws InvalidRoutingDataException {
     if (connectionConfig == null) {
       throw new IllegalArgumentException("RealmAwareZkConnectionConfig cannot be null!");
     }
@@ -76,14 +74,15 @@ public class SharedZkClient implements RealmAwareZkClient {
     _connectionConfig = connectionConfig;
     _clientConfig = clientConfig;
 
-    // Get the routing data from a static Singleton HttpRoutingDataReader
-    String msdsEndpoint = connectionConfig.getMsdsEndpoint();
-    if (msdsEndpoint == null || msdsEndpoint.isEmpty()) {
+    // Get MetadataStoreRoutingData
+    String routingDataSourceEndpoint = connectionConfig.getRoutingDataSourceEndpoint();
+    if (routingDataSourceEndpoint == null || routingDataSourceEndpoint.isEmpty()) {
+      // If endpoint is not given explicitly, use HTTP and the endpoint set in System Properties
       _metadataStoreRoutingData = RoutingDataManager.getMetadataStoreRoutingData();
     } else {
-      // TODO: Make RoutingDataReaderType configurable
-      _metadataStoreRoutingData =
-          RoutingDataManager.getMetadataStoreRoutingData(RoutingDataReaderType.HTTP, msdsEndpoint);
+      _metadataStoreRoutingData = RoutingDataManager.getMetadataStoreRoutingData(
+          RoutingDataReaderType.lookUp(connectionConfig.getRoutingDataSourceType()),
+          routingDataSourceEndpoint);
     }
 
     _zkRealmShardingKey = connectionConfig.getZkRealmShardingKey();