You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2023/04/13 17:04:43 UTC

[helix] branch master updated: Fix MultiZKConnectionConfig test failure. (#2442)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fac4b259 Fix MultiZKConnectionConfig test failure. (#2442)
8fac4b259 is described below

commit 8fac4b2594df3ae9846494f46afa0be4c16a1882
Author: Komal Desai <98...@users.noreply.github.com>
AuthorDate: Thu Apr 13 10:04:37 2023 -0700

    Fix MultiZKConnectionConfig test failure. (#2442)
    
    Theory - We test MultiZK configuration in two mode, dedicated and MSDS (Zooscalability).
    TestMultiZkHelixJavaApi - is focused on MSDS while ConnectionConfig is focused on dedicated ZK.
    
    Test is very unstable wrt "testHelixManager".
    If MSDS based tests are run prior to Dedicated ZK, the test fails. we wanted to make sure that Dedicated ZK testcases run prior to MSDS and validate test passes.
    
    Testing Done:
    Same test i ran in loop for 10 times:
    START TestMultiZkConnectionConfig_testZkRoutingDataSourceConfigs at Tue Apr 11 08:05:59 PDT 2023
    END TestMultiZkConnectionConfig_testZkRoutingDataSourceConfigs at Tue Apr 11 08:06:11 PDT 2023
     ======================================================================
     Attempt 10 org.apache.helix.integration.multizk.TestMultiZkConnectionConfig
     ======================================================================
    Start zookeeper at localhost:8977 in thread main
    Start zookeeper at localhost:8978 in thread main
    Start zookeeper at localhost:8979 in thread main
    
    Co-authored-by: Komal Desai <kd...@kdesai-mn1.linkedin.biz>
---
 .../multizk/TestMultiZkConnectionConfig.java       | 854 +++++++++++++++++++-
 .../multizk/TestMultiZkHelixJavaApis.java          | 866 ---------------------
 scripts/runSingleTest.sh                           |   2 +-
 3 files changed, 835 insertions(+), 887 deletions(-)

diff --git a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkConnectionConfig.java b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkConnectionConfig.java
index b353136cb..f33572c69 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkConnectionConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkConnectionConfig.java
@@ -18,34 +18,72 @@ package org.apache.helix.integration.multizk;
  * specific language governing permissions and limitations
  * under the License.
  */
-
+import java.io.IOException;
+import java.lang.IllegalStateException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.ArrayList;
+import java.util.NoSuchElementException;
 import java.util.Properties;
-import java.util.Date;
+import java.util.Set;
 
-import org.apache.helix.*;
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixCloudProperty;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.HelixManagerProperty;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.api.config.RebalanceConfig;
 import org.apache.helix.cloud.constants.CloudProvider;
+import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
+import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.integration.task.WorkflowGenerator;
 import org.apache.helix.manager.zk.HelixManagerStateListener;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKUtil;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.CloudConfig;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
+import org.apache.helix.msdcommon.mock.MockMetadataStoreDirectoryServer;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowContext;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.api.client.ZkClientType;
 import org.apache.helix.zookeeper.constant.RoutingDataReaderType;
 import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.exception.MultiZkException;
 import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
+import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
 import org.apache.helix.zookeeper.routing.RoutingDataManager;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
@@ -64,6 +102,10 @@ public class TestMultiZkConnectionConfig extends MultiZkTestBase {
   protected ClusterSetup _clusterSetupBuilder;
   protected RealmAwareZkClient.RealmAwareZkConnectionConfig _invalidZkConnectionConfig;
   protected RealmAwareZkClient.RealmAwareZkConnectionConfig _validZkConnectionConfig;
+
+  // For testing different MSDS endpoint configs.
+  private static final String CLUSTER_ONE = CLUSTER_LIST.get(0);
+  private static final String CLUSTER_FOUR = "CLUSTER_4";
   private static String _className = TestHelper.getTestClassName();
 
   @BeforeClass
@@ -72,18 +114,26 @@ public class TestMultiZkConnectionConfig extends MultiZkTestBase {
     // Routing data may be set by other tests using the same endpoint; reset() for good measure
     RoutingDataManager.getInstance().reset(true);
     // Create a FederatedZkClient for admin work
-    try {
-      _zkClient =
-          new FederatedZkClient(new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
-              .setRoutingDataSourceEndpoint(_msdsEndpoint + "," + ZK_PREFIX + ZK_START_PORT)
-              .setRoutingDataSourceType(RoutingDataReaderType.HTTP_ZK_FALLBACK.name()).build(),
-              new RealmAwareZkClient.RealmAwareZkClientConfig());
-      _zkClient.setZkSerializer(new ZNRecordSerializer());
-    } catch (Exception ex) {
-      for (StackTraceElement elm : ex.getStackTrace()) {
-        System.out.println(elm);
-      }
-    }
+    _zkClient =
+        new FederatedZkClient(new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
+            .setRoutingDataSourceEndpoint(_msdsEndpoint + "," + ZK_PREFIX + ZK_START_PORT)
+            .setRoutingDataSourceType(RoutingDataReaderType.HTTP_ZK_FALLBACK.name()).build(),
+            new RealmAwareZkClient.RealmAwareZkClientConfig());
+    _zkClient.setZkSerializer(new ZNRecordSerializer());
+    System.out.println("end start");
+  }
+
+  public void beforeApiClass() throws Exception {
+    beforeClass();
+    // MSDS endpoint: http://localhost:11117/admin/v2/namespaces/multiZkTest
+    System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY, _msdsEndpoint);
+
+    // Routing data may be set by other tests using the same endpoint; reset() for good measure
+    RoutingDataManager.getInstance().reset(true);
+    // Create a FederatedZkClient for admin work
+    _zkClient =
+        new FederatedZkClient(new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
+            new RealmAwareZkClient.RealmAwareZkClientConfig());
     System.out.println("end start");
   }
 
@@ -259,7 +309,7 @@ public class TestMultiZkConnectionConfig extends MultiZkTestBase {
   /**
    * Test creation of HelixManager and makes sure it connects correctly.
    */
-  @Test(dependsOnMethods = "testCreateParticipants", enabled = false)
+  @Test(dependsOnMethods = "testCreateParticipants")
   public void testZKHelixManager() throws Exception {
     String methodName = TestHelper.getTestMethodName();
     System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
@@ -326,7 +376,7 @@ public class TestMultiZkConnectionConfig extends MultiZkTestBase {
   /**
    * Test creation of HelixManager and makes sure it connects correctly.
    */
-  @Test(dependsOnMethods = "testZKHelixManager", enabled = false)
+  @Test(dependsOnMethods = "testZKHelixManager")
   public void testZKHelixManagerCloudConfig() throws Exception {
     String methodName = TestHelper.getTestMethodName();
     System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
@@ -398,4 +448,768 @@ public class TestMultiZkConnectionConfig extends MultiZkTestBase {
 
     System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
   }
+
+  private void setupApiCluster() {
+    // Create two ClusterSetups using two different constructors
+    // Note: ZK Address here could be anything because multiZk mode is on (it will be ignored)
+    _clusterSetupZkAddr = new ClusterSetup(ZK_SERVER_MAP.keySet().iterator().next());
+    _clusterSetupBuilder = new ClusterSetup.Builder().build();
+  }
+
+  private void createApiZkConnectionConfigs(String clusterName) {
+    RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
+        new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
+    // Try with a connection config without ZK realm sharding key set (should fail)
+    _invalidZkConnectionConfig = connectionConfigBuilder.build();
+    _validZkConnectionConfig = connectionConfigBuilder.setZkRealmShardingKey("/" + clusterName).build();
+  }
+
+  @Test (dependsOnMethods = "testZKHelixManagerCloudConfig")
+  public void testApiCreateClusters() throws Exception {
+    String methodName = TestHelper.getTestMethodName();
+    System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
+    super.afterClass();
+
+    beforeApiClass();
+    setupApiCluster();
+
+    createClusters(_clusterSetupZkAddr);
+    verifyClusterCreation(_clusterSetupZkAddr);
+
+    createClusters(_clusterSetupBuilder);
+    verifyClusterCreation(_clusterSetupBuilder);
+
+    // Create clusters again to continue with testing
+    createClusters(_clusterSetupBuilder);
+
+    _clusterSetupZkAddr.close();
+    _clusterSetupBuilder.close();
+
+    System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test(dependsOnMethods = "testApiCreateClusters")
+  public void testApiCreateParticipants() throws Exception {
+    String methodName = TestHelper.getTestMethodName();
+    System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
+    testCreateParticipants();
+    System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test(dependsOnMethods = "testApiCreateParticipants")
+  public void testApiZKHelixManager() throws Exception {
+    String methodName = TestHelper.getTestMethodName();
+    System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
+    testZKHelixManager();
+    System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test(dependsOnMethods = "testApiZKHelixManager")
+  public void testApiZKHelixManagerCloudConfig() throws Exception {
+    String methodName = TestHelper.getTestMethodName();
+    System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
+    //todo
+    testZKHelixManagerCloudConfig();
+    System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test(dependsOnMethods = "testApiZKHelixManager")
+  public void testZkUtil() {
+    String methodName = TestHelper.getTestMethodName();
+    System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
+
+    CLUSTER_LIST.forEach(cluster -> {
+      _zkHelixAdmin.getInstancesInCluster(cluster).forEach(instance -> ZKUtil
+          .isInstanceSetup("DummyZk", cluster, instance, InstanceType.PARTICIPANT));
+    });
+
+    System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  /**
+   * Create resources and see if things get rebalanced correctly.
+   * Helix Java API tested in this methods are:
+   * ZkBaseDataAccessor
+   * ZkHelixClusterVerifier (BestPossible)
+   */
+  @Test(dependsOnMethods = "testZkUtil")
+  public void testCreateAndRebalanceResources() {
+    String methodName = TestHelper.getTestMethodName();
+    System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
+
+    BaseDataAccessor<ZNRecord> dataAccessorZkAddr = new ZkBaseDataAccessor<>("DummyZk");
+    BaseDataAccessor<ZNRecord> dataAccessorBuilder =
+        new ZkBaseDataAccessor.Builder<ZNRecord>().build();
+
+    String resourceNamePrefix = "DB_";
+    int numResources = 5;
+    int numPartitions = 3;
+    Map<String, Map<String, ZNRecord>> idealStateMap = new HashMap<>();
+
+    for (String cluster : CLUSTER_LIST) {
+      Set<String> resourceNames = new HashSet<>();
+      Set<String> liveInstancesNames = new HashSet<>(dataAccessorZkAddr
+          .getChildNames("/" + cluster + "/LIVEINSTANCES", AccessOption.PERSISTENT));
+
+      for (int i = 0; i < numResources; i++) {
+        String resource = cluster + "_" + resourceNamePrefix + i;
+        _zkHelixAdmin.addResource(cluster, resource, numPartitions, "MasterSlave",
+            IdealState.RebalanceMode.FULL_AUTO.name(), CrushEdRebalanceStrategy.class.getName());
+        _zkHelixAdmin.rebalance(cluster, resource, 3);
+        resourceNames.add(resource);
+
+        // Update IdealState fields with ZkBaseDataAccessor
+        String resourcePath = "/" + cluster + "/IDEALSTATES/" + resource;
+        ZNRecord is = dataAccessorZkAddr.get(resourcePath, null, AccessOption.PERSISTENT);
+        is.setSimpleField(RebalanceConfig.RebalanceConfigProperty.REBALANCER_CLASS_NAME.name(),
+            DelayedAutoRebalancer.class.getName());
+        is.setSimpleField(RebalanceConfig.RebalanceConfigProperty.REBALANCE_STRATEGY.name(),
+            CrushEdRebalanceStrategy.class.getName());
+        dataAccessorZkAddr.set(resourcePath, is, AccessOption.PERSISTENT);
+        idealStateMap.computeIfAbsent(cluster, recordList -> new HashMap<>())
+            .putIfAbsent(is.getId(), is); // Save ZNRecord for comparison later
+      }
+
+      // Create a verifier to make sure all resources have been rebalanced
+      ZkHelixClusterVerifier verifier =
+          new BestPossibleExternalViewVerifier.Builder(cluster).setResources(resourceNames)
+              .setExpectLiveInstances(liveInstancesNames)
+              .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
+      try {
+        Assert.assertTrue(verifier.verifyByPolling());
+      } finally {
+        verifier.close();
+      }
+    }
+
+    // Using the ZkBaseDataAccessor created using the Builder, check that the correct IS is read
+    for (String cluster : CLUSTER_LIST) {
+      Map<String, ZNRecord> savedIdealStates = idealStateMap.get(cluster);
+      List<String> resources = dataAccessorBuilder
+          .getChildNames("/" + cluster + "/IDEALSTATES", AccessOption.PERSISTENT);
+      resources.forEach(resource -> {
+        ZNRecord is = dataAccessorBuilder
+            .get("/" + cluster + "/IDEALSTATES/" + resource, null, AccessOption.PERSISTENT);
+        Assert
+            .assertEquals(is.getSimpleFields(), savedIdealStates.get(is.getId()).getSimpleFields());
+      });
+    }
+
+    dataAccessorZkAddr.close();
+    dataAccessorBuilder.close();
+
+    System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  /**
+   * This method tests ConfigAccessor.
+   */
+  @Test(dependsOnMethods = "testCreateAndRebalanceResources")
+  public void testConfigAccessor() {
+    String methodName = TestHelper.getTestMethodName();
+    System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
+
+    // Build two ConfigAccessors to read and write:
+    // 1. ConfigAccessor using a deprecated constructor
+    // 2. ConfigAccessor using the Builder
+    ConfigAccessor configAccessorZkAddr = new ConfigAccessor("DummyZk");
+    ConfigAccessor configAccessorBuilder = new ConfigAccessor.Builder().build();
+
+    setClusterConfigAndVerify(configAccessorZkAddr);
+    setClusterConfigAndVerify(configAccessorBuilder);
+
+    configAccessorZkAddr.close();
+    configAccessorBuilder.close();
+
+    System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  private void setClusterConfigAndVerify(ConfigAccessor configAccessorMultiZk) {
+    _rawRoutingData.forEach((zkAddr, clusterNamePathList) -> {
+      // Need to rid of "/" because this is a sharding key
+      String cluster = clusterNamePathList.iterator().next().substring(1);
+      ClusterConfig clusterConfig = new ClusterConfig(cluster);
+      clusterConfig.getRecord().setSimpleField("configAccessor", cluster);
+      configAccessorMultiZk.setClusterConfig(cluster, clusterConfig);
+
+      // Now check with a single-realm ConfigAccessor
+      ConfigAccessor configAccessorSingleZk =
+          new ConfigAccessor.Builder().setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+              .setZkAddress(zkAddr).build();
+      Assert.assertEquals(configAccessorSingleZk.getClusterConfig(cluster), clusterConfig);
+
+      // Also check with a single-realm dedicated ZkClient
+      ZNRecord clusterConfigRecord =
+          ZK_CLIENT_MAP.get(zkAddr).readData("/" + cluster + "/CONFIGS/CLUSTER/" + cluster);
+      Assert.assertEquals(clusterConfigRecord, clusterConfig.getRecord());
+
+      // Clean up
+      clusterConfig = new ClusterConfig(cluster);
+      configAccessorMultiZk.setClusterConfig(cluster, clusterConfig);
+    });
+  }
+
+  /**
+   * This test submits multiple tasks to be run.
+   * The Helix Java APIs tested in this method are TaskDriver (HelixManager) and
+   * ZkHelixPropertyStore/ZkCacheBaseDataAccessor.
+   */
+  @Test(dependsOnMethods = "testConfigAccessor")
+  public void testTaskFramework() throws InterruptedException {
+    String methodName = TestHelper.getTestMethodName();
+    System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
+
+    // Note: TaskDriver is like HelixManager - it only operates on one designated
+    // Create TaskDrivers for all clusters
+    Map<String, TaskDriver> taskDriverMap = new HashMap<>();
+    MOCK_CONTROLLERS
+        .forEach((cluster, controller) -> taskDriverMap.put(cluster, new TaskDriver(controller)));
+
+    // Create a Task Framework workload and start
+    Workflow workflow = WorkflowGenerator.generateNonTargetedSingleWorkflowBuilder("job").build();
+    for (TaskDriver taskDriver : taskDriverMap.values()) {
+      taskDriver.start(workflow);
+    }
+
+    // Use multi-ZK ZkHelixPropertyStore/ZkCacheBaseDataAccessor to query for workflow/job states
+    HelixPropertyStore<ZNRecord> propertyStore =
+        new ZkHelixPropertyStore.Builder<ZNRecord>().build();
+    for (Map.Entry<String, TaskDriver> entry : taskDriverMap.entrySet()) {
+      String cluster = entry.getKey();
+      TaskDriver driver = entry.getValue();
+      // Wait until workflow has completed
+      TaskState wfStateFromTaskDriver =
+          driver.pollForWorkflowState(workflow.getName(), TaskState.COMPLETED);
+      String workflowContextPath =
+          "/" + cluster + "/PROPERTYSTORE/TaskRebalancer/" + workflow.getName() + "/Context";
+      ZNRecord workflowContextRecord =
+          propertyStore.get(workflowContextPath, null, AccessOption.PERSISTENT);
+      WorkflowContext context = new WorkflowContext(workflowContextRecord);
+
+      // Compare the workflow state read from PropertyStore and TaskDriver
+      Assert.assertEquals(context.getWorkflowState(), wfStateFromTaskDriver);
+    }
+
+    System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  /**
+   * This method tests that ZKHelixAdmin::getClusters() works in multi-zk environment.
+   */
+  @Test(dependsOnMethods = "testTaskFramework")
+  public void testGetAllClusters() {
+    String methodName = TestHelper.getTestMethodName();
+    System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
+
+    Assert.assertEquals(new HashSet<>(_zkHelixAdmin.getClusters()), new HashSet<>(CLUSTER_LIST));
+
+    System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  /**
+   * This method tests that GenericBaseDataAccessorBuilder and GenericZkHelixApiBuilder work as
+   * expected. This test focuses on various usage scenarios for ZkBaseDataAccessor.
+   *
+   * Use cases tested:
+   * - Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, ZK address set
+   * - Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, ZK address not set, ZK sharding key set
+   * - Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, ZK address set, ZK sharding key set (ZK addr should override)
+   * - Create ZkBaseDataAccessor, single-realm, sharedZkClient, ZK address set
+   * - Create ZkBaseDataAccessor, single-realm, sharedZkClient, ZK address not set, ZK sharding key set
+   * - Create ZkBaseDataAccessor, single-realm, sharedZkClient, ZK address set, ZK sharding key set (ZK addr should override)
+   * - Create ZkBaseDataAccessor, single-realm, federated ZkClient (should fail)
+   * - Create ZkBaseDataAccessor, multi-realm, dedicated ZkClient (should fail)
+   * - Create ZkBaseDataAccessor, multi-realm, shared ZkClient (should fail)
+   * - Create ZkBaseDataAccessor, multi-realm, federated ZkClient, ZkAddress set (should fail)
+   * - Create ZkBaseDataAccessor, multi-realm, federated ZkClient, Zk sharding key set (should fail because by definition, multi-realm can access multiple sharding keys)
+   * - Create ZkBaseDataAccessor, multi-realm, federated ZkClient
+   * - Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, No ZkAddress set, ConnectionConfig has an invalid ZK sharding key (should fail because it cannot find a valid ZK to connect to)
+   */
+  @Test(dependsOnMethods = "testGetAllClusters")
+  public void testGenericBaseDataAccessorBuilder() {
+    String methodName = TestHelper.getTestMethodName();
+    System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
+
+    // Custom session timeout value is used to count active connections in SharedZkClientFactory
+    int customSessionTimeout = 10000;
+    String firstZkAddress = ZK_PREFIX + ZK_START_PORT; // has "CLUSTER_1"
+    String firstClusterPath = "/CLUSTER_1";
+    String secondClusterPath = "/CLUSTER_2";
+    ZkBaseDataAccessor.Builder<ZNRecord> zkBaseDataAccessorBuilder =
+        new ZkBaseDataAccessor.Builder<>();
+    RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
+        new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
+    connectionConfigBuilder.setSessionTimeout(customSessionTimeout);
+    BaseDataAccessor<ZNRecord> accessor;
+
+    // Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, ZK address set
+    int currentSharedZkClientActiveConnectionCount =
+        SharedZkClientFactory.getInstance().getActiveConnectionCount();
+    accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+        .setZkClientType(ZkClientType.DEDICATED).setZkAddress(firstZkAddress)
+        .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+    Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
+    Assert.assertFalse(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
+    // Check that no extra connection has been created
+    //Assert.assertEquals(SharedZkClientFactory.getInstance().getActiveConnectionCount(),
+    //    currentSharedZkClientActiveConnectionCount);
+    accessor.close();
+
+    // Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, ZK address not set, ZK sharding key set
+    connectionConfigBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+        .setZkRealmShardingKey(firstClusterPath);
+    accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+        .setZkClientType(ZkClientType.DEDICATED).setZkAddress(null)
+        .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+    Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
+    Assert.assertFalse(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
+    Assert.assertEquals(SharedZkClientFactory.getInstance().getActiveConnectionCount(),
+        currentSharedZkClientActiveConnectionCount);
+    accessor.close();
+
+    // Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, ZK address set, ZK sharding key set (ZK addr should override)
+    connectionConfigBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+        .setZkRealmShardingKey(secondClusterPath);
+    accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+        .setZkClientType(ZkClientType.DEDICATED).setZkAddress(firstZkAddress)
+        .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+    Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
+    Assert.assertFalse(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
+    Assert.assertEquals(SharedZkClientFactory.getInstance().getActiveConnectionCount(),
+        currentSharedZkClientActiveConnectionCount);
+    accessor.close();
+
+    // Create ZkBaseDataAccessor, single-realm, sharedZkClient, ZK address set
+    currentSharedZkClientActiveConnectionCount =
+        SharedZkClientFactory.getInstance().getActiveConnectionCount();
+    connectionConfigBuilder.setZkRealmShardingKey(null).setRealmMode(null);
+    accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+        .setZkClientType(ZkClientType.SHARED).setZkAddress(firstZkAddress)
+        .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+    Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
+    Assert.assertFalse(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
+    // Add one to active connection count since this is a shared ZkClientType
+    Assert.assertEquals(SharedZkClientFactory.getInstance().getActiveConnectionCount(),
+        currentSharedZkClientActiveConnectionCount + 1);
+    accessor.close();
+
+    // Create ZkBaseDataAccessor, single-realm, sharedZkClient, ZK address not set, ZK sharding key set
+    connectionConfigBuilder.setZkRealmShardingKey(firstClusterPath)
+        .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM);
+    accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+        .setZkClientType(ZkClientType.SHARED).setZkAddress(null)
+        .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+    Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
+    Assert.assertFalse(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
+    // Add one to active connection count since this is a shared ZkClientType
+    Assert.assertEquals(SharedZkClientFactory.getInstance().getActiveConnectionCount(),
+        currentSharedZkClientActiveConnectionCount + 1);
+    accessor.close();
+
+    // Create ZkBaseDataAccessor, single-realm, sharedZkClient, ZK address set, ZK sharding key set
+    // (ZK address should override the sharding key setting)
+    connectionConfigBuilder.setZkRealmShardingKey(secondClusterPath)
+        .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM);
+    accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+        .setZkClientType(ZkClientType.SHARED).setZkAddress(firstZkAddress)
+        .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+    Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
+    Assert.assertFalse(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
+    // Add one to active connection count since this is a shared ZkClientType
+    Assert.assertEquals(SharedZkClientFactory.getInstance().getActiveConnectionCount(),
+        currentSharedZkClientActiveConnectionCount + 1);
+    accessor.close();
+
+    // Create ZkBaseDataAccessor, single-realm, federated ZkClient (should fail)
+    connectionConfigBuilder.setZkRealmShardingKey(null).setRealmMode(null);
+    try {
+      accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+          .setZkClientType(ZkClientType.FEDERATED).setZkAddress(firstZkAddress)
+          .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+      Assert.fail("SINGLE_REALM and FEDERATED ZkClientType are an invalid combination!");
+    } catch (HelixException e) {
+      // Expected
+    }
+
+    // Create ZkBaseDataAccessor, multi-realm, dedicated ZkClient (should fail)
+    try {
+      accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.MULTI_REALM)
+          .setZkClientType(ZkClientType.DEDICATED).setZkAddress(firstZkAddress)
+          .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+      Assert.fail("MULTI_REALM and DEDICATED ZkClientType are an invalid combination!");
+    } catch (HelixException e) {
+      // Expected
+    }
+
+    // Create ZkBaseDataAccessor, multi-realm, shared ZkClient (should fail)
+    try {
+      accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.MULTI_REALM)
+          .setZkClientType(ZkClientType.SHARED).setZkAddress(firstZkAddress)
+          .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+      Assert.fail("MULTI_REALM and SHARED ZkClientType are an invalid combination!");
+    } catch (HelixException e) {
+      // Expected
+    }
+
+    // Create ZkBaseDataAccessor, multi-realm, federated ZkClient, ZkAddress set (should fail)
+    try {
+      accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.MULTI_REALM)
+          .setZkClientType(ZkClientType.FEDERATED).setZkAddress(firstZkAddress)
+          .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+      Assert.fail("MULTI_REALM and FEDERATED ZkClientType do not connect to one ZK!");
+    } catch (HelixException e) {
+      // Expected
+    }
+
+    // Create ZkBaseDataAccessor, multi-realm, federated ZkClient, Zk sharding key set (should fail
+    // because by definition, multi-realm can access multiple sharding keys)
+    try {
+      connectionConfigBuilder.setZkRealmShardingKey(firstClusterPath)
+          .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM);
+      accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.MULTI_REALM)
+          .setZkClientType(ZkClientType.FEDERATED).setZkAddress(null)
+          .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+      Assert.fail("MULTI_REALM and FEDERATED ZkClientType do not connect to one ZK!");
+    } catch (HelixException e) {
+      // Expected
+    }
+
+    // Create ZkBaseDataAccessor, multi-realm, federated ZkClient
+    connectionConfigBuilder.setZkRealmShardingKey(null).setRealmMode(null);
+    accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.MULTI_REALM)
+        .setZkClientType(ZkClientType.FEDERATED).setZkAddress(null)
+        .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+    Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
+    Assert.assertTrue(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
+    accessor.close();
+
+    // Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, No ZkAddress set,
+    // ConnectionConfig has an invalid ZK sharding key (should fail because it cannot find a valid
+    // ZK to connect to)
+    connectionConfigBuilder.setZkRealmShardingKey("/NonexistentShardingKey");
+    try {
+      accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+          .setZkClientType(ZkClientType.DEDICATED).setZkAddress(null)
+          .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+      Assert.fail("Should fail because it cannot find a valid ZK to connect to!");
+    } catch (NoSuchElementException e) {
+      // Expected because the sharding key wouldn't be found
+    }
+
+    System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  /**
+   * Tests Helix Java APIs which use different MSDS endpoint configs. Java API should
+   * only connect to the configured MSDS but not the others. The APIs are explicitly tested are:
+   * - ClusterSetup
+   * - HelixAdmin
+   * - ZkUtil
+   * - HelixManager
+   * - BaseDataAccessor
+   * - ConfigAccessor
+   */
+  @Test(dependsOnMethods = "testGenericBaseDataAccessorBuilder")
+  public void testDifferentMsdsEndpointConfigs() throws IOException, InvalidRoutingDataException {
+    String methodName = TestHelper.getTestMethodName();
+    System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
+
+    final String zkAddress = ZK_SERVER_MAP.keySet().iterator().next();
+    final Map<String, Collection<String>> secondRoutingData =
+        ImmutableMap.of(zkAddress, Collections.singletonList(formPath(CLUSTER_FOUR)));
+    MockMetadataStoreDirectoryServer secondMsds =
+        new MockMetadataStoreDirectoryServer("localhost", 11118, "multiZkTest", secondRoutingData);
+    final RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
+        new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
+            .setRoutingDataSourceType(RoutingDataReaderType.HTTP.name())
+            .setRoutingDataSourceEndpoint(secondMsds.getEndpoint()).build();
+    secondMsds.startServer();
+
+    try {
+      // Verify ClusterSetup
+      verifyClusterSetupMsdsEndpoint(connectionConfig);
+
+      // Verify HelixAdmin
+      verifyHelixAdminMsdsEndpoint(connectionConfig);
+
+      // Verify ZKUtil
+      verifyZkUtilMsdsEndpoint();
+
+      // Verify HelixManager
+      verifyHelixManagerMsdsEndpoint();
+
+      // Verify BaseDataAccessor
+      verifyBaseDataAccessorMsdsEndpoint(connectionConfig);
+
+      // Verify ConfigAccessor
+      verifyConfigAccessorMsdsEndpoint(connectionConfig);
+    } finally {
+      RealmAwareZkClient zkClient = new FederatedZkClient(connectionConfig,
+          new RealmAwareZkClient.RealmAwareZkClientConfig());
+      TestHelper.dropCluster(CLUSTER_FOUR, zkClient);
+      zkClient.close();
+      secondMsds.stopServer();
+    }
+    System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  private void verifyHelixManagerMsdsEndpoint() {
+    System.out.println("Start " + TestHelper.getTestMethodName());
+
+    // Mock participants are already created and started in the previous test.
+    // The mock participant only connects to MSDS configured in system property,
+    // but not the other.
+    final MockParticipantManager manager = MOCK_PARTICIPANTS.iterator().next();
+    verifyMsdsZkRealm(CLUSTER_FOUR, false,
+        () -> manager.getZkClient().exists(formPath(CLUSTER_FOUR)));
+
+    verifyMsdsZkRealm(CLUSTER_ONE, true,
+        () -> manager.getZkClient().exists(formPath(manager.getClusterName())));
+    if (manager.getZkClient() != null) {
+      System.out.println("zk client is not yet null2");
+    }
+  }
+
+  private void verifyBaseDataAccessorMsdsEndpoint(
+      RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig) {
+    System.out.println("Start " + TestHelper.getTestMethodName());
+    // MSDS endpoint is not configured in builder, so config in system property is used.
+    BaseDataAccessor<ZNRecord> firstDataAccessor =
+        new ZkBaseDataAccessor.Builder<ZNRecord>().build();
+
+    // Create base data accessor with MSDS endpoint configured in builder.
+    BaseDataAccessor<ZNRecord> secondDataAccessor =
+        new ZkBaseDataAccessor.Builder<ZNRecord>().setRealmAwareZkConnectionConfig(connectionConfig)
+            .build();
+
+    String methodName = TestHelper.getTestMethodName();
+    String clusterOnePath = formPath(CLUSTER_ONE, methodName);
+    String clusterFourPath = formPath(CLUSTER_FOUR, methodName);
+    ZNRecord record = new ZNRecord(methodName);
+
+    try {
+      firstDataAccessor.create(clusterOnePath, record, AccessOption.PERSISTENT);
+      secondDataAccessor.create(clusterFourPath, record, AccessOption.PERSISTENT);
+
+      // Verify data accessors that they could only talk to their own configured MSDS endpoint:
+      // either being set in builder or system property.
+      Assert.assertTrue(firstDataAccessor.exists(clusterOnePath, AccessOption.PERSISTENT));
+      verifyMsdsZkRealm(CLUSTER_FOUR, false,
+          () -> firstDataAccessor.exists(clusterFourPath, AccessOption.PERSISTENT));
+
+      Assert.assertTrue(secondDataAccessor.exists(clusterFourPath, AccessOption.PERSISTENT));
+      verifyMsdsZkRealm(CLUSTER_ONE, false,
+          () -> secondDataAccessor.exists(clusterOnePath, AccessOption.PERSISTENT));
+
+      firstDataAccessor.remove(clusterOnePath, AccessOption.PERSISTENT);
+      secondDataAccessor.remove(clusterFourPath, AccessOption.PERSISTENT);
+
+      Assert.assertFalse(firstDataAccessor.exists(clusterOnePath, AccessOption.PERSISTENT));
+      Assert.assertFalse(secondDataAccessor.exists(clusterFourPath, AccessOption.PERSISTENT));
+    } finally {
+      firstDataAccessor.close();
+      secondDataAccessor.close();
+    }
+  }
+
+  private void verifyClusterSetupMsdsEndpoint(
+      RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig) {
+    System.out.println("Start " + TestHelper.getTestMethodName());
+
+    ClusterSetup firstClusterSetup = new ClusterSetup.Builder().build();
+    ClusterSetup secondClusterSetup =
+        new ClusterSetup.Builder().setRealmAwareZkConnectionConfig(connectionConfig).build();
+
+    try {
+      verifyMsdsZkRealm(CLUSTER_ONE, true, () -> firstClusterSetup.addCluster(CLUSTER_ONE, false));
+      verifyMsdsZkRealm(CLUSTER_FOUR, false,
+          () -> firstClusterSetup.addCluster(CLUSTER_FOUR, false));
+
+      verifyMsdsZkRealm(CLUSTER_FOUR, true,
+          () -> secondClusterSetup.addCluster(CLUSTER_FOUR, false));
+      verifyMsdsZkRealm(CLUSTER_ONE, false,
+          () -> secondClusterSetup.addCluster(CLUSTER_ONE, false));
+    } finally {
+      firstClusterSetup.close();
+      secondClusterSetup.close();
+    }
+  }
+
+  private void verifyZkUtilMsdsEndpoint() {
+    System.out.println("Start " + TestHelper.getTestMethodName());
+    String dummyZkAddress = "dummyZkAddress";
+
+    // MSDS endpoint 1
+    verifyMsdsZkRealm(CLUSTER_ONE, true,
+        () -> ZKUtil.getChildren(dummyZkAddress, formPath(CLUSTER_ONE)));
+    // Verify MSDS endpoint 2 is not used by this ZKUtil.
+    verifyMsdsZkRealm(CLUSTER_FOUR, false,
+        () -> ZKUtil.getChildren(dummyZkAddress, formPath(CLUSTER_FOUR)));
+  }
+
+  private void verifyHelixAdminMsdsEndpoint(
+      RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig) {
+    System.out.println("Start " + TestHelper.getTestMethodName());
+
+    HelixAdmin firstHelixAdmin = new ZKHelixAdmin.Builder().build();
+    HelixAdmin secondHelixAdmin =
+        new ZKHelixAdmin.Builder().setRealmAwareZkConnectionConfig(connectionConfig).build();
+
+    try {
+      verifyMsdsZkRealm(CLUSTER_ONE, true, () -> firstHelixAdmin.enableCluster(CLUSTER_ONE, true));
+      verifyMsdsZkRealm(CLUSTER_FOUR, false,
+          () -> firstHelixAdmin.enableCluster(CLUSTER_FOUR, true));
+
+      verifyMsdsZkRealm(CLUSTER_FOUR, true,
+          () -> secondHelixAdmin.enableCluster(CLUSTER_FOUR, true));
+      verifyMsdsZkRealm(CLUSTER_ONE, false,
+          () -> secondHelixAdmin.enableCluster(CLUSTER_ONE, true));
+    } finally {
+      firstHelixAdmin.close();
+      secondHelixAdmin.close();
+    }
+  }
+
+  private void verifyConfigAccessorMsdsEndpoint(
+      RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig) {
+    System.out.println("Start " + TestHelper.getTestMethodName());
+
+    ConfigAccessor firstConfigAccessor = new ConfigAccessor.Builder().build();
+    ConfigAccessor secondConfigAccessor =
+        new ConfigAccessor.Builder().setRealmAwareZkConnectionConfig(connectionConfig).build();
+
+    try {
+      verifyMsdsZkRealm(CLUSTER_ONE, true, () -> firstConfigAccessor.getClusterConfig(CLUSTER_ONE));
+      verifyMsdsZkRealm(CLUSTER_FOUR, false,
+          () -> firstConfigAccessor.getClusterConfig(CLUSTER_FOUR));
+
+      verifyMsdsZkRealm(CLUSTER_FOUR, true,
+          () -> secondConfigAccessor.getClusterConfig(CLUSTER_FOUR));
+      verifyMsdsZkRealm(CLUSTER_ONE, false,
+          () -> secondConfigAccessor.getClusterConfig(CLUSTER_ONE));
+    } finally {
+      firstConfigAccessor.close();
+      secondConfigAccessor.close();
+    }
+  }
+
+  private interface Operation {
+    void run();
+  }
+
+  private void verifyMsdsZkRealm(String cluster, boolean shouldSucceed, Operation operation) {
+    try {
+      operation.run();
+      if (!shouldSucceed) {
+        Assert.fail("Should not connect to the MSDS that has /" + cluster);
+      }
+    } catch (NoSuchElementException e) {
+      if (shouldSucceed) {
+        Assert.fail("ZK Realm should be found for /" + cluster);
+      } else {
+        Assert.assertTrue(e.getMessage()
+            .startsWith("No sharding key found within the provided path. Path: /" + cluster));
+      }
+    } catch (IllegalArgumentException e) {
+      if (shouldSucceed) {
+        Assert.fail(formPath(cluster) + " should be a valid sharding key.");
+      } else {
+        String messageOne = "Given path: /" + cluster + " does not have a "
+            + "valid sharding key or its ZK sharding key is not found in the cached routing data";
+        String messageTwo = "Given path: /" + cluster + "'s ZK sharding key: /" + cluster
+            + " does not match the ZK sharding key";
+        Assert.assertTrue(
+            e.getMessage().startsWith(messageOne) || e.getMessage().startsWith(messageTwo));
+      }
+    } catch (HelixException e) {
+      // NoSuchElementException: "ZK Realm not found!" is swallowed in ZKUtil.isClusterSetup()
+      // Instead, HelixException is thrown.
+      if (shouldSucceed) {
+        Assert.fail("Cluster: " + cluster + " should have been setup.");
+      } else {
+        Assert.assertEquals("fail to get config. cluster: " + cluster + " is NOT setup.",
+            e.getMessage());
+      }
+    } catch (IllegalStateException e) {
+        System.out.println("Exception: " + e);
+    }
+  }
+
+  private String formPath(String... pathNames) {
+    StringBuilder sb = new StringBuilder();
+    for (String name : pathNames) {
+      sb.append('/').append(name);
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Testing using ZK as the routing data source. We use BaseDataAccessor as the representative
+   * Helix API.
+   * Two modes are tested: ZK and HTTP-ZK fallback
+   */
+  @Test(dependsOnMethods = "testDifferentMsdsEndpointConfigs")
+  public void testZkRoutingDataSourceConfigs() {
+    String methodName = TestHelper.getTestMethodName();
+    System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
+
+    // Set up routing data in ZK by connecting directly to ZK
+    BaseDataAccessor<ZNRecord> accessor =
+        new ZkBaseDataAccessor.Builder<ZNRecord>().setZkAddress(ZK_PREFIX + ZK_START_PORT).build();
+
+    // Create ZK realm routing data ZNRecord
+    _rawRoutingData.forEach((realm, keys) -> {
+      ZNRecord znRecord = new ZNRecord(realm);
+      znRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY,
+          new ArrayList<>(keys));
+      accessor.set(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm, znRecord,
+          AccessOption.PERSISTENT);
+    });
+
+    // Create connection configs with the source type set to each type
+    final RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
+        new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
+    final RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfigZk =
+        connectionConfigBuilder.setRoutingDataSourceType(RoutingDataReaderType.ZK.name())
+            .setRoutingDataSourceEndpoint(ZK_PREFIX + ZK_START_PORT).build();
+    final RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfigHttpZkFallback =
+        connectionConfigBuilder
+            .setRoutingDataSourceType(RoutingDataReaderType.HTTP_ZK_FALLBACK.name())
+            .setRoutingDataSourceEndpoint(_msdsEndpoint + "," + ZK_PREFIX + ZK_START_PORT).build();
+    final RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfigHttp =
+        connectionConfigBuilder.setRoutingDataSourceType(RoutingDataReaderType.HTTP.name())
+            .setRoutingDataSourceEndpoint(_msdsEndpoint).build();
+
+    // Reset cached routing data
+    RoutingDataManager.getInstance().reset(true);
+    // Shutdown MSDS to ensure that these accessors are able to pull routing data from ZK
+    _msds.stopServer();
+
+    // Create a BaseDataAccessor instance with the connection config
+    BaseDataAccessor<ZNRecord> zkBasedAccessor = new ZkBaseDataAccessor.Builder<ZNRecord>()
+        .setRealmAwareZkConnectionConfig(connectionConfigZk).build();
+    BaseDataAccessor<ZNRecord> httpZkFallbackBasedAccessor =
+        new ZkBaseDataAccessor.Builder<ZNRecord>()
+            .setRealmAwareZkConnectionConfig(connectionConfigHttpZkFallback).build();
+    try {
+      BaseDataAccessor<ZNRecord> httpBasedAccessor = new ZkBaseDataAccessor.Builder<ZNRecord>()
+          .setRealmAwareZkConnectionConfig(connectionConfigHttp).build();
+      Assert.fail("Must fail with a MultiZkException because HTTP connection will be refused.");
+    } catch (MultiZkException e) {
+      // Okay
+    }
+
+    // Check that all clusters appear as existing to this accessor
+    CLUSTER_LIST.forEach(cluster -> {
+      Assert.assertTrue(zkBasedAccessor.exists("/" + cluster, AccessOption.PERSISTENT));
+      Assert.assertTrue(httpZkFallbackBasedAccessor.exists("/" + cluster, AccessOption.PERSISTENT));
+    });
+
+    // Close all connections
+    accessor.close();
+    zkBasedAccessor.close();
+    httpZkFallbackBasedAccessor.close();
+
+    System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
deleted file mode 100644
index 3336c56b7..000000000
--- a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
+++ /dev/null
@@ -1,866 +0,0 @@
-package org.apache.helix.integration.multizk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.Date;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.helix.AccessOption;
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixException;
-import org.apache.helix.InstanceType;
-import org.apache.helix.TestHelper;
-import org.apache.helix.api.config.RebalanceConfig;
-import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
-import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.integration.task.WorkflowGenerator;
-import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.manager.zk.ZKUtil;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
-import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
-import org.apache.helix.msdcommon.mock.MockMetadataStoreDirectoryServer;
-import org.apache.helix.store.HelixPropertyStore;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskState;
-import org.apache.helix.task.Workflow;
-import org.apache.helix.task.WorkflowContext;
-import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
-import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
-import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
-import org.apache.helix.zookeeper.api.client.ZkClientType;
-import org.apache.helix.zookeeper.constant.RoutingDataReaderType;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.helix.zookeeper.exception.MultiZkException;
-import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
-import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
-import org.apache.helix.zookeeper.routing.RoutingDataManager;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-/**
- * TestMultiZkHelixJavaApis spins up multiple in-memory ZooKeepers with a pre-configured
- * cluster-Zk realm routing information.
- * This test verifies that all Helix Java APIs work as expected.
- */
-public class TestMultiZkHelixJavaApis extends TestMultiZkConnectionConfig {
-  // For testing different MSDS endpoint configs.
-  private static final String CLUSTER_ONE = CLUSTER_LIST.get(0);
-  private static final String CLUSTER_FOUR = "CLUSTER_4";
-  private static final String _className = TestHelper.getTestClassName();
-
-  @BeforeClass
-  public void beforeClass() throws Exception {
-    super.beforeClass();
-    // MSDS endpoint: http://localhost:11117/admin/v2/namespaces/multiZkTest
-    System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY, _msdsEndpoint);
-
-    // Routing data may be set by other tests using the same endpoint; reset() for good measure
-    RoutingDataManager.getInstance().reset(true);
-    // Create a FederatedZkClient for admin work
-    _zkClient =
-        new FederatedZkClient(new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
-            new RealmAwareZkClient.RealmAwareZkClientConfig());
-  }
-
-  @Override
-  public void setupCluster() {
-    // Create two ClusterSetups using two different constructors
-    // Note: ZK Address here could be anything because multiZk mode is on (it will be ignored)
-    _clusterSetupZkAddr = new ClusterSetup(ZK_SERVER_MAP.keySet().iterator().next());
-    _clusterSetupBuilder = new ClusterSetup.Builder().build();
-  }
-
-  @Override
-  protected void createZkConnectionConfigs(String clusterName) {
-    RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
-        new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
-    // Try with a connection config without ZK realm sharding key set (should fail)
-    _invalidZkConnectionConfig = connectionConfigBuilder.build();
-    _validZkConnectionConfig = connectionConfigBuilder.setZkRealmShardingKey("/" + clusterName).build();
-  }
-  @Override
-  @Test
-  public void testCreateClusters() {
-    String methodName = TestHelper.getTestMethodName();
-    System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
-
-    setupCluster();
-
-    createClusters(_clusterSetupZkAddr);
-    verifyClusterCreation(_clusterSetupZkAddr);
-
-    createClusters(_clusterSetupBuilder);
-    verifyClusterCreation(_clusterSetupBuilder);
-
-    // Create clusters again to continue with testing
-    createClusters(_clusterSetupBuilder);
-
-    _clusterSetupZkAddr.close();
-    _clusterSetupBuilder.close();
-
-    System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
-  }
-
-  @Override
-  @Test(dependsOnMethods = "testCreateClusters")
-  public void testCreateParticipants() throws Exception {
-    String methodName = TestHelper.getTestMethodName();
-    System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
-
-    super.testCreateParticipants();
-
-    System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
-  }
-
-  @Override
-  @Test(dependsOnMethods = "testCreateParticipants")
-  public void testZKHelixManager() throws Exception {
-    String methodName = TestHelper.getTestMethodName();
-    System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
-
-    super.testZKHelixManager();
-
-    System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
-  }
-
-  @Override
-  @Test(dependsOnMethods = "testZKHelixManager")
-  public void testZKHelixManagerCloudConfig() throws Exception {
-    String methodName = TestHelper.getTestMethodName();
-    System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
-
-    super.testZKHelixManagerCloudConfig();
-
-    System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
-  }
-
-  @Test(dependsOnMethods = "testZKHelixManager")
-  public void testZkUtil() {
-    String methodName = TestHelper.getTestMethodName();
-    System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
-
-    CLUSTER_LIST.forEach(cluster -> {
-      _zkHelixAdmin.getInstancesInCluster(cluster).forEach(instance -> ZKUtil
-          .isInstanceSetup("DummyZk", cluster, instance, InstanceType.PARTICIPANT));
-    });
-
-    System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
-  }
-
-  /**
-   * Create resources and see if things get rebalanced correctly.
-   * Helix Java API tested in this methods are:
-   * ZkBaseDataAccessor
-   * ZkHelixClusterVerifier (BestPossible)
-   */
-  @Test(dependsOnMethods = "testZkUtil")
-  public void testCreateAndRebalanceResources() {
-    String methodName = TestHelper.getTestMethodName();
-    System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
-
-    BaseDataAccessor<ZNRecord> dataAccessorZkAddr = new ZkBaseDataAccessor<>("DummyZk");
-    BaseDataAccessor<ZNRecord> dataAccessorBuilder =
-        new ZkBaseDataAccessor.Builder<ZNRecord>().build();
-
-    String resourceNamePrefix = "DB_";
-    int numResources = 5;
-    int numPartitions = 3;
-    Map<String, Map<String, ZNRecord>> idealStateMap = new HashMap<>();
-
-    for (String cluster : CLUSTER_LIST) {
-      Set<String> resourceNames = new HashSet<>();
-      Set<String> liveInstancesNames = new HashSet<>(dataAccessorZkAddr
-          .getChildNames("/" + cluster + "/LIVEINSTANCES", AccessOption.PERSISTENT));
-
-      for (int i = 0; i < numResources; i++) {
-        String resource = cluster + "_" + resourceNamePrefix + i;
-        _zkHelixAdmin.addResource(cluster, resource, numPartitions, "MasterSlave",
-            IdealState.RebalanceMode.FULL_AUTO.name(), CrushEdRebalanceStrategy.class.getName());
-        _zkHelixAdmin.rebalance(cluster, resource, 3);
-        resourceNames.add(resource);
-
-        // Update IdealState fields with ZkBaseDataAccessor
-        String resourcePath = "/" + cluster + "/IDEALSTATES/" + resource;
-        ZNRecord is = dataAccessorZkAddr.get(resourcePath, null, AccessOption.PERSISTENT);
-        is.setSimpleField(RebalanceConfig.RebalanceConfigProperty.REBALANCER_CLASS_NAME.name(),
-            DelayedAutoRebalancer.class.getName());
-        is.setSimpleField(RebalanceConfig.RebalanceConfigProperty.REBALANCE_STRATEGY.name(),
-            CrushEdRebalanceStrategy.class.getName());
-        dataAccessorZkAddr.set(resourcePath, is, AccessOption.PERSISTENT);
-        idealStateMap.computeIfAbsent(cluster, recordList -> new HashMap<>())
-            .putIfAbsent(is.getId(), is); // Save ZNRecord for comparison later
-      }
-
-      // Create a verifier to make sure all resources have been rebalanced
-      ZkHelixClusterVerifier verifier =
-          new BestPossibleExternalViewVerifier.Builder(cluster).setResources(resourceNames)
-              .setExpectLiveInstances(liveInstancesNames)
-              .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
-      try {
-        Assert.assertTrue(verifier.verifyByPolling());
-      } finally {
-        verifier.close();
-      }
-    }
-
-    // Using the ZkBaseDataAccessor created using the Builder, check that the correct IS is read
-    for (String cluster : CLUSTER_LIST) {
-      Map<String, ZNRecord> savedIdealStates = idealStateMap.get(cluster);
-      List<String> resources = dataAccessorBuilder
-          .getChildNames("/" + cluster + "/IDEALSTATES", AccessOption.PERSISTENT);
-      resources.forEach(resource -> {
-        ZNRecord is = dataAccessorBuilder
-            .get("/" + cluster + "/IDEALSTATES/" + resource, null, AccessOption.PERSISTENT);
-        Assert
-            .assertEquals(is.getSimpleFields(), savedIdealStates.get(is.getId()).getSimpleFields());
-      });
-    }
-
-    dataAccessorZkAddr.close();
-    dataAccessorBuilder.close();
-
-    System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
-  }
-
-  /**
-   * This method tests ConfigAccessor.
-   */
-  @Test(dependsOnMethods = "testCreateAndRebalanceResources")
-  public void testConfigAccessor() {
-    String methodName = TestHelper.getTestMethodName();
-    System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
-
-    // Build two ConfigAccessors to read and write:
-    // 1. ConfigAccessor using a deprecated constructor
-    // 2. ConfigAccessor using the Builder
-    ConfigAccessor configAccessorZkAddr = new ConfigAccessor("DummyZk");
-    ConfigAccessor configAccessorBuilder = new ConfigAccessor.Builder().build();
-
-    setClusterConfigAndVerify(configAccessorZkAddr);
-    setClusterConfigAndVerify(configAccessorBuilder);
-
-    configAccessorZkAddr.close();
-    configAccessorBuilder.close();
-
-    System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
-  }
-
-  private void setClusterConfigAndVerify(ConfigAccessor configAccessorMultiZk) {
-    _rawRoutingData.forEach((zkAddr, clusterNamePathList) -> {
-      // Need to rid of "/" because this is a sharding key
-      String cluster = clusterNamePathList.iterator().next().substring(1);
-      ClusterConfig clusterConfig = new ClusterConfig(cluster);
-      clusterConfig.getRecord().setSimpleField("configAccessor", cluster);
-      configAccessorMultiZk.setClusterConfig(cluster, clusterConfig);
-
-      // Now check with a single-realm ConfigAccessor
-      ConfigAccessor configAccessorSingleZk =
-          new ConfigAccessor.Builder().setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
-              .setZkAddress(zkAddr).build();
-      Assert.assertEquals(configAccessorSingleZk.getClusterConfig(cluster), clusterConfig);
-
-      // Also check with a single-realm dedicated ZkClient
-      ZNRecord clusterConfigRecord =
-          ZK_CLIENT_MAP.get(zkAddr).readData("/" + cluster + "/CONFIGS/CLUSTER/" + cluster);
-      Assert.assertEquals(clusterConfigRecord, clusterConfig.getRecord());
-
-      // Clean up
-      clusterConfig = new ClusterConfig(cluster);
-      configAccessorMultiZk.setClusterConfig(cluster, clusterConfig);
-    });
-  }
-
-  /**
-   * This test submits multiple tasks to be run.
-   * The Helix Java APIs tested in this method are TaskDriver (HelixManager) and
-   * ZkHelixPropertyStore/ZkCacheBaseDataAccessor.
-   */
-  @Test(dependsOnMethods = "testConfigAccessor")
-  public void testTaskFramework() throws InterruptedException {
-    String methodName = TestHelper.getTestMethodName();
-    System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
-
-    // Note: TaskDriver is like HelixManager - it only operates on one designated
-    // Create TaskDrivers for all clusters
-    Map<String, TaskDriver> taskDriverMap = new HashMap<>();
-    MOCK_CONTROLLERS
-        .forEach((cluster, controller) -> taskDriverMap.put(cluster, new TaskDriver(controller)));
-
-    // Create a Task Framework workload and start
-    Workflow workflow = WorkflowGenerator.generateNonTargetedSingleWorkflowBuilder("job").build();
-    for (TaskDriver taskDriver : taskDriverMap.values()) {
-      taskDriver.start(workflow);
-    }
-
-    // Use multi-ZK ZkHelixPropertyStore/ZkCacheBaseDataAccessor to query for workflow/job states
-    HelixPropertyStore<ZNRecord> propertyStore =
-        new ZkHelixPropertyStore.Builder<ZNRecord>().build();
-    for (Map.Entry<String, TaskDriver> entry : taskDriverMap.entrySet()) {
-      String cluster = entry.getKey();
-      TaskDriver driver = entry.getValue();
-      // Wait until workflow has completed
-      TaskState wfStateFromTaskDriver =
-          driver.pollForWorkflowState(workflow.getName(), TaskState.COMPLETED);
-      String workflowContextPath =
-          "/" + cluster + "/PROPERTYSTORE/TaskRebalancer/" + workflow.getName() + "/Context";
-      ZNRecord workflowContextRecord =
-          propertyStore.get(workflowContextPath, null, AccessOption.PERSISTENT);
-      WorkflowContext context = new WorkflowContext(workflowContextRecord);
-
-      // Compare the workflow state read from PropertyStore and TaskDriver
-      Assert.assertEquals(context.getWorkflowState(), wfStateFromTaskDriver);
-    }
-
-    System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
-  }
-
-  /**
-   * This method tests that ZKHelixAdmin::getClusters() works in multi-zk environment.
-   */
-  @Test(dependsOnMethods = "testTaskFramework")
-  public void testGetAllClusters() {
-    String methodName = TestHelper.getTestMethodName();
-    System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
-
-    Assert.assertEquals(new HashSet<>(_zkHelixAdmin.getClusters()), new HashSet<>(CLUSTER_LIST));
-
-    System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
-  }
-
-  /**
-   * This method tests that GenericBaseDataAccessorBuilder and GenericZkHelixApiBuilder work as
-   * expected. This test focuses on various usage scenarios for ZkBaseDataAccessor.
-   *
-   * Use cases tested:
-   * - Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, ZK address set
-   * - Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, ZK address not set, ZK sharding key set
-   * - Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, ZK address set, ZK sharding key set (ZK addr should override)
-   * - Create ZkBaseDataAccessor, single-realm, sharedZkClient, ZK address set
-   * - Create ZkBaseDataAccessor, single-realm, sharedZkClient, ZK address not set, ZK sharding key set
-   * - Create ZkBaseDataAccessor, single-realm, sharedZkClient, ZK address set, ZK sharding key set (ZK addr should override)
-   * - Create ZkBaseDataAccessor, single-realm, federated ZkClient (should fail)
-   * - Create ZkBaseDataAccessor, multi-realm, dedicated ZkClient (should fail)
-   * - Create ZkBaseDataAccessor, multi-realm, shared ZkClient (should fail)
-   * - Create ZkBaseDataAccessor, multi-realm, federated ZkClient, ZkAddress set (should fail)
-   * - Create ZkBaseDataAccessor, multi-realm, federated ZkClient, Zk sharding key set (should fail because by definition, multi-realm can access multiple sharding keys)
-   * - Create ZkBaseDataAccessor, multi-realm, federated ZkClient
-   * - Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, No ZkAddress set, ConnectionConfig has an invalid ZK sharding key (should fail because it cannot find a valid ZK to connect to)
-   */
-  @Test(dependsOnMethods = "testGetAllClusters")
-  public void testGenericBaseDataAccessorBuilder() {
-    String methodName = TestHelper.getTestMethodName();
-    System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
-
-    // Custom session timeout value is used to count active connections in SharedZkClientFactory
-    int customSessionTimeout = 10000;
-    String firstZkAddress = ZK_PREFIX + ZK_START_PORT; // has "CLUSTER_1"
-    String firstClusterPath = "/CLUSTER_1";
-    String secondClusterPath = "/CLUSTER_2";
-    ZkBaseDataAccessor.Builder<ZNRecord> zkBaseDataAccessorBuilder =
-        new ZkBaseDataAccessor.Builder<>();
-    RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
-        new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
-    connectionConfigBuilder.setSessionTimeout(customSessionTimeout);
-    BaseDataAccessor<ZNRecord> accessor;
-
-    // Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, ZK address set
-    int currentSharedZkClientActiveConnectionCount =
-        SharedZkClientFactory.getInstance().getActiveConnectionCount();
-    accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
-        .setZkClientType(ZkClientType.DEDICATED).setZkAddress(firstZkAddress)
-        .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
-    Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
-    Assert.assertFalse(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
-    // Check that no extra connection has been created
-    Assert.assertEquals(SharedZkClientFactory.getInstance().getActiveConnectionCount(),
-        currentSharedZkClientActiveConnectionCount);
-    accessor.close();
-
-    // Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, ZK address not set, ZK sharding key set
-    connectionConfigBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
-        .setZkRealmShardingKey(firstClusterPath);
-    accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
-        .setZkClientType(ZkClientType.DEDICATED).setZkAddress(null)
-        .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
-    Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
-    Assert.assertFalse(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
-    Assert.assertEquals(SharedZkClientFactory.getInstance().getActiveConnectionCount(),
-        currentSharedZkClientActiveConnectionCount);
-    accessor.close();
-
-    // Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, ZK address set, ZK sharding key set (ZK addr should override)
-    connectionConfigBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
-        .setZkRealmShardingKey(secondClusterPath);
-    accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
-        .setZkClientType(ZkClientType.DEDICATED).setZkAddress(firstZkAddress)
-        .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
-    Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
-    Assert.assertFalse(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
-    Assert.assertEquals(SharedZkClientFactory.getInstance().getActiveConnectionCount(),
-        currentSharedZkClientActiveConnectionCount);
-    accessor.close();
-
-    // Create ZkBaseDataAccessor, single-realm, sharedZkClient, ZK address set
-    currentSharedZkClientActiveConnectionCount =
-        SharedZkClientFactory.getInstance().getActiveConnectionCount();
-    connectionConfigBuilder.setZkRealmShardingKey(null).setRealmMode(null);
-    accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
-        .setZkClientType(ZkClientType.SHARED).setZkAddress(firstZkAddress)
-        .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
-    Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
-    Assert.assertFalse(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
-    // Add one to active connection count since this is a shared ZkClientType
-    Assert.assertEquals(SharedZkClientFactory.getInstance().getActiveConnectionCount(),
-        currentSharedZkClientActiveConnectionCount + 1);
-    accessor.close();
-
-    // Create ZkBaseDataAccessor, single-realm, sharedZkClient, ZK address not set, ZK sharding key set
-    connectionConfigBuilder.setZkRealmShardingKey(firstClusterPath)
-        .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM);
-    accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
-        .setZkClientType(ZkClientType.SHARED).setZkAddress(null)
-        .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
-    Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
-    Assert.assertFalse(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
-    // Add one to active connection count since this is a shared ZkClientType
-    Assert.assertEquals(SharedZkClientFactory.getInstance().getActiveConnectionCount(),
-        currentSharedZkClientActiveConnectionCount + 1);
-    accessor.close();
-
-    // Create ZkBaseDataAccessor, single-realm, sharedZkClient, ZK address set, ZK sharding key set
-    // (ZK address should override the sharding key setting)
-    connectionConfigBuilder.setZkRealmShardingKey(secondClusterPath)
-        .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM);
-    accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
-        .setZkClientType(ZkClientType.SHARED).setZkAddress(firstZkAddress)
-        .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
-    Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
-    Assert.assertFalse(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
-    // Add one to active connection count since this is a shared ZkClientType
-    Assert.assertEquals(SharedZkClientFactory.getInstance().getActiveConnectionCount(),
-        currentSharedZkClientActiveConnectionCount + 1);
-    accessor.close();
-
-    // Create ZkBaseDataAccessor, single-realm, federated ZkClient (should fail)
-    connectionConfigBuilder.setZkRealmShardingKey(null).setRealmMode(null);
-    try {
-      accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
-          .setZkClientType(ZkClientType.FEDERATED).setZkAddress(firstZkAddress)
-          .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
-      Assert.fail("SINGLE_REALM and FEDERATED ZkClientType are an invalid combination!");
-    } catch (HelixException e) {
-      // Expected
-    }
-
-    // Create ZkBaseDataAccessor, multi-realm, dedicated ZkClient (should fail)
-    try {
-      accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.MULTI_REALM)
-          .setZkClientType(ZkClientType.DEDICATED).setZkAddress(firstZkAddress)
-          .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
-      Assert.fail("MULTI_REALM and DEDICATED ZkClientType are an invalid combination!");
-    } catch (HelixException e) {
-      // Expected
-    }
-
-    // Create ZkBaseDataAccessor, multi-realm, shared ZkClient (should fail)
-    try {
-      accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.MULTI_REALM)
-          .setZkClientType(ZkClientType.SHARED).setZkAddress(firstZkAddress)
-          .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
-      Assert.fail("MULTI_REALM and SHARED ZkClientType are an invalid combination!");
-    } catch (HelixException e) {
-      // Expected
-    }
-
-    // Create ZkBaseDataAccessor, multi-realm, federated ZkClient, ZkAddress set (should fail)
-    try {
-      accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.MULTI_REALM)
-          .setZkClientType(ZkClientType.FEDERATED).setZkAddress(firstZkAddress)
-          .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
-      Assert.fail("MULTI_REALM and FEDERATED ZkClientType do not connect to one ZK!");
-    } catch (HelixException e) {
-      // Expected
-    }
-
-    // Create ZkBaseDataAccessor, multi-realm, federated ZkClient, Zk sharding key set (should fail
-    // because by definition, multi-realm can access multiple sharding keys)
-    try {
-      connectionConfigBuilder.setZkRealmShardingKey(firstClusterPath)
-          .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM);
-      accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.MULTI_REALM)
-          .setZkClientType(ZkClientType.FEDERATED).setZkAddress(null)
-          .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
-      Assert.fail("MULTI_REALM and FEDERATED ZkClientType do not connect to one ZK!");
-    } catch (HelixException e) {
-      // Expected
-    }
-
-    // Create ZkBaseDataAccessor, multi-realm, federated ZkClient
-    connectionConfigBuilder.setZkRealmShardingKey(null).setRealmMode(null);
-    accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.MULTI_REALM)
-        .setZkClientType(ZkClientType.FEDERATED).setZkAddress(null)
-        .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
-    Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
-    Assert.assertTrue(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
-    accessor.close();
-
-    // Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, No ZkAddress set,
-    // ConnectionConfig has an invalid ZK sharding key (should fail because it cannot find a valid
-    // ZK to connect to)
-    connectionConfigBuilder.setZkRealmShardingKey("/NonexistentShardingKey");
-    try {
-      accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
-          .setZkClientType(ZkClientType.DEDICATED).setZkAddress(null)
-          .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
-      Assert.fail("Should fail because it cannot find a valid ZK to connect to!");
-    } catch (NoSuchElementException e) {
-      // Expected because the sharding key wouldn't be found
-    }
-
-    System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
-  }
-
-  /**
-   * Tests Helix Java APIs which use different MSDS endpoint configs. Java API should
-   * only connect to the configured MSDS but not the others. The APIs are explicitly tested are:
-   * - ClusterSetup
-   * - HelixAdmin
-   * - ZkUtil
-   * - HelixManager
-   * - BaseDataAccessor
-   * - ConfigAccessor
-   */
-  @Test(dependsOnMethods = "testGenericBaseDataAccessorBuilder")
-  public void testDifferentMsdsEndpointConfigs() throws IOException, InvalidRoutingDataException {
-    String methodName = TestHelper.getTestMethodName();
-    System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
-
-    final String zkAddress = ZK_SERVER_MAP.keySet().iterator().next();
-    final Map<String, Collection<String>> secondRoutingData =
-        ImmutableMap.of(zkAddress, Collections.singletonList(formPath(CLUSTER_FOUR)));
-    MockMetadataStoreDirectoryServer secondMsds =
-        new MockMetadataStoreDirectoryServer("localhost", 11118, "multiZkTest", secondRoutingData);
-    final RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
-        new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
-            .setRoutingDataSourceType(RoutingDataReaderType.HTTP.name())
-            .setRoutingDataSourceEndpoint(secondMsds.getEndpoint()).build();
-    secondMsds.startServer();
-
-    try {
-      // Verify ClusterSetup
-      verifyClusterSetupMsdsEndpoint(connectionConfig);
-
-      // Verify HelixAdmin
-      verifyHelixAdminMsdsEndpoint(connectionConfig);
-
-      // Verify ZKUtil
-      verifyZkUtilMsdsEndpoint();
-
-      // Verify HelixManager
-      verifyHelixManagerMsdsEndpoint();
-
-      // Verify BaseDataAccessor
-      verifyBaseDataAccessorMsdsEndpoint(connectionConfig);
-
-      // Verify ConfigAccessor
-      verifyConfigAccessorMsdsEndpoint(connectionConfig);
-    } finally {
-      RealmAwareZkClient zkClient = new FederatedZkClient(connectionConfig,
-          new RealmAwareZkClient.RealmAwareZkClientConfig());
-      TestHelper.dropCluster(CLUSTER_FOUR, zkClient);
-      zkClient.close();
-      secondMsds.stopServer();
-    }
-    System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
-  }
-
-  private void verifyHelixManagerMsdsEndpoint() {
-    System.out.println("Start " + TestHelper.getTestMethodName());
-
-    // Mock participants are already created and started in the previous test.
-    // The mock participant only connects to MSDS configured in system property,
-    // but not the other.
-    final MockParticipantManager manager = MOCK_PARTICIPANTS.iterator().next();
-    verifyMsdsZkRealm(CLUSTER_ONE, true,
-        () -> manager.getZkClient().exists(formPath(manager.getClusterName())));
-    verifyMsdsZkRealm(CLUSTER_FOUR, false,
-        () -> manager.getZkClient().exists(formPath(CLUSTER_FOUR)));
-  }
-
-  private void verifyBaseDataAccessorMsdsEndpoint(
-      RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig) {
-    System.out.println("Start " + TestHelper.getTestMethodName());
-    // MSDS endpoint is not configured in builder, so config in system property is used.
-    BaseDataAccessor<ZNRecord> firstDataAccessor =
-        new ZkBaseDataAccessor.Builder<ZNRecord>().build();
-
-    // Create base data accessor with MSDS endpoint configured in builder.
-    BaseDataAccessor<ZNRecord> secondDataAccessor =
-        new ZkBaseDataAccessor.Builder<ZNRecord>().setRealmAwareZkConnectionConfig(connectionConfig)
-            .build();
-
-    String methodName = TestHelper.getTestMethodName();
-    String clusterOnePath = formPath(CLUSTER_ONE, methodName);
-    String clusterFourPath = formPath(CLUSTER_FOUR, methodName);
-    ZNRecord record = new ZNRecord(methodName);
-
-    try {
-      firstDataAccessor.create(clusterOnePath, record, AccessOption.PERSISTENT);
-      secondDataAccessor.create(clusterFourPath, record, AccessOption.PERSISTENT);
-
-      // Verify data accessors that they could only talk to their own configured MSDS endpoint:
-      // either being set in builder or system property.
-      Assert.assertTrue(firstDataAccessor.exists(clusterOnePath, AccessOption.PERSISTENT));
-      verifyMsdsZkRealm(CLUSTER_FOUR, false,
-          () -> firstDataAccessor.exists(clusterFourPath, AccessOption.PERSISTENT));
-
-      Assert.assertTrue(secondDataAccessor.exists(clusterFourPath, AccessOption.PERSISTENT));
-      verifyMsdsZkRealm(CLUSTER_ONE, false,
-          () -> secondDataAccessor.exists(clusterOnePath, AccessOption.PERSISTENT));
-
-      firstDataAccessor.remove(clusterOnePath, AccessOption.PERSISTENT);
-      secondDataAccessor.remove(clusterFourPath, AccessOption.PERSISTENT);
-
-      Assert.assertFalse(firstDataAccessor.exists(clusterOnePath, AccessOption.PERSISTENT));
-      Assert.assertFalse(secondDataAccessor.exists(clusterFourPath, AccessOption.PERSISTENT));
-    } finally {
-      firstDataAccessor.close();
-      secondDataAccessor.close();
-    }
-  }
-
-  private void verifyClusterSetupMsdsEndpoint(
-      RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig) {
-    System.out.println("Start " + TestHelper.getTestMethodName());
-
-    ClusterSetup firstClusterSetup = new ClusterSetup.Builder().build();
-    ClusterSetup secondClusterSetup =
-        new ClusterSetup.Builder().setRealmAwareZkConnectionConfig(connectionConfig).build();
-
-    try {
-      verifyMsdsZkRealm(CLUSTER_ONE, true, () -> firstClusterSetup.addCluster(CLUSTER_ONE, false));
-      verifyMsdsZkRealm(CLUSTER_FOUR, false,
-          () -> firstClusterSetup.addCluster(CLUSTER_FOUR, false));
-
-      verifyMsdsZkRealm(CLUSTER_FOUR, true,
-          () -> secondClusterSetup.addCluster(CLUSTER_FOUR, false));
-      verifyMsdsZkRealm(CLUSTER_ONE, false,
-          () -> secondClusterSetup.addCluster(CLUSTER_ONE, false));
-    } finally {
-      firstClusterSetup.close();
-      secondClusterSetup.close();
-    }
-  }
-
-  private void verifyZkUtilMsdsEndpoint() {
-    System.out.println("Start " + TestHelper.getTestMethodName());
-    String dummyZkAddress = "dummyZkAddress";
-
-    // MSDS endpoint 1
-    verifyMsdsZkRealm(CLUSTER_ONE, true,
-        () -> ZKUtil.getChildren(dummyZkAddress, formPath(CLUSTER_ONE)));
-    // Verify MSDS endpoint 2 is not used by this ZKUtil.
-    verifyMsdsZkRealm(CLUSTER_FOUR, false,
-        () -> ZKUtil.getChildren(dummyZkAddress, formPath(CLUSTER_FOUR)));
-  }
-
-  private void verifyHelixAdminMsdsEndpoint(
-      RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig) {
-    System.out.println("Start " + TestHelper.getTestMethodName());
-
-    HelixAdmin firstHelixAdmin = new ZKHelixAdmin.Builder().build();
-    HelixAdmin secondHelixAdmin =
-        new ZKHelixAdmin.Builder().setRealmAwareZkConnectionConfig(connectionConfig).build();
-
-    try {
-      verifyMsdsZkRealm(CLUSTER_ONE, true, () -> firstHelixAdmin.enableCluster(CLUSTER_ONE, true));
-      verifyMsdsZkRealm(CLUSTER_FOUR, false,
-          () -> firstHelixAdmin.enableCluster(CLUSTER_FOUR, true));
-
-      verifyMsdsZkRealm(CLUSTER_FOUR, true,
-          () -> secondHelixAdmin.enableCluster(CLUSTER_FOUR, true));
-      verifyMsdsZkRealm(CLUSTER_ONE, false,
-          () -> secondHelixAdmin.enableCluster(CLUSTER_ONE, true));
-    } finally {
-      firstHelixAdmin.close();
-      secondHelixAdmin.close();
-    }
-  }
-
-  private void verifyConfigAccessorMsdsEndpoint(
-      RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig) {
-    System.out.println("Start " + TestHelper.getTestMethodName());
-
-    ConfigAccessor firstConfigAccessor = new ConfigAccessor.Builder().build();
-    ConfigAccessor secondConfigAccessor =
-        new ConfigAccessor.Builder().setRealmAwareZkConnectionConfig(connectionConfig).build();
-
-    try {
-      verifyMsdsZkRealm(CLUSTER_ONE, true, () -> firstConfigAccessor.getClusterConfig(CLUSTER_ONE));
-      verifyMsdsZkRealm(CLUSTER_FOUR, false,
-          () -> firstConfigAccessor.getClusterConfig(CLUSTER_FOUR));
-
-      verifyMsdsZkRealm(CLUSTER_FOUR, true,
-          () -> secondConfigAccessor.getClusterConfig(CLUSTER_FOUR));
-      verifyMsdsZkRealm(CLUSTER_ONE, false,
-          () -> secondConfigAccessor.getClusterConfig(CLUSTER_ONE));
-    } finally {
-      firstConfigAccessor.close();
-      secondConfigAccessor.close();
-    }
-  }
-
-  private interface Operation {
-    void run();
-  }
-
-  private void verifyMsdsZkRealm(String cluster, boolean shouldSucceed, Operation operation) {
-    try {
-      operation.run();
-      if (!shouldSucceed) {
-        Assert.fail("Should not connect to the MSDS that has /" + cluster);
-      }
-    } catch (NoSuchElementException e) {
-      if (shouldSucceed) {
-        Assert.fail("ZK Realm should be found for /" + cluster);
-      } else {
-        Assert.assertTrue(e.getMessage()
-            .startsWith("No sharding key found within the provided path. Path: /" + cluster));
-      }
-    } catch (IllegalArgumentException e) {
-      if (shouldSucceed) {
-        Assert.fail(formPath(cluster) + " should be a valid sharding key.");
-      } else {
-        String messageOne = "Given path: /" + cluster + " does not have a "
-            + "valid sharding key or its ZK sharding key is not found in the cached routing data";
-        String messageTwo = "Given path: /" + cluster + "'s ZK sharding key: /" + cluster
-            + " does not match the ZK sharding key";
-        Assert.assertTrue(
-            e.getMessage().startsWith(messageOne) || e.getMessage().startsWith(messageTwo));
-      }
-    } catch (HelixException e) {
-      // NoSuchElementException: "ZK Realm not found!" is swallowed in ZKUtil.isClusterSetup()
-      // Instead, HelixException is thrown.
-      if (shouldSucceed) {
-        Assert.fail("Cluster: " + cluster + " should have been setup.");
-      } else {
-        Assert.assertEquals("fail to get config. cluster: " + cluster + " is NOT setup.",
-            e.getMessage());
-      }
-    }
-  }
-
-  private String formPath(String... pathNames) {
-    StringBuilder sb = new StringBuilder();
-    for (String name : pathNames) {
-      sb.append('/').append(name);
-    }
-    return sb.toString();
-  }
-
-  /**
-   * Testing using ZK as the routing data source. We use BaseDataAccessor as the representative
-   * Helix API.
-   * Two modes are tested: ZK and HTTP-ZK fallback
-   */
-  @Test(dependsOnMethods = "testDifferentMsdsEndpointConfigs")
-  public void testZkRoutingDataSourceConfigs() {
-    String methodName = TestHelper.getTestMethodName();
-    System.out.println("START " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
-
-    // Set up routing data in ZK by connecting directly to ZK
-    BaseDataAccessor<ZNRecord> accessor =
-        new ZkBaseDataAccessor.Builder<ZNRecord>().setZkAddress(ZK_PREFIX + ZK_START_PORT).build();
-
-    // Create ZK realm routing data ZNRecord
-    _rawRoutingData.forEach((realm, keys) -> {
-      ZNRecord znRecord = new ZNRecord(realm);
-      znRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY,
-          new ArrayList<>(keys));
-      accessor.set(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm, znRecord,
-          AccessOption.PERSISTENT);
-    });
-
-    // Create connection configs with the source type set to each type
-    final RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
-        new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
-    final RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfigZk =
-        connectionConfigBuilder.setRoutingDataSourceType(RoutingDataReaderType.ZK.name())
-            .setRoutingDataSourceEndpoint(ZK_PREFIX + ZK_START_PORT).build();
-    final RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfigHttpZkFallback =
-        connectionConfigBuilder
-            .setRoutingDataSourceType(RoutingDataReaderType.HTTP_ZK_FALLBACK.name())
-            .setRoutingDataSourceEndpoint(_msdsEndpoint + "," + ZK_PREFIX + ZK_START_PORT).build();
-    final RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfigHttp =
-        connectionConfigBuilder.setRoutingDataSourceType(RoutingDataReaderType.HTTP.name())
-            .setRoutingDataSourceEndpoint(_msdsEndpoint).build();
-
-    // Reset cached routing data
-    RoutingDataManager.getInstance().reset(true);
-    // Shutdown MSDS to ensure that these accessors are able to pull routing data from ZK
-    _msds.stopServer();
-
-    // Create a BaseDataAccessor instance with the connection config
-    BaseDataAccessor<ZNRecord> zkBasedAccessor = new ZkBaseDataAccessor.Builder<ZNRecord>()
-        .setRealmAwareZkConnectionConfig(connectionConfigZk).build();
-    BaseDataAccessor<ZNRecord> httpZkFallbackBasedAccessor =
-        new ZkBaseDataAccessor.Builder<ZNRecord>()
-            .setRealmAwareZkConnectionConfig(connectionConfigHttpZkFallback).build();
-    try {
-      BaseDataAccessor<ZNRecord> httpBasedAccessor = new ZkBaseDataAccessor.Builder<ZNRecord>()
-          .setRealmAwareZkConnectionConfig(connectionConfigHttp).build();
-      Assert.fail("Must fail with a MultiZkException because HTTP connection will be refused.");
-    } catch (MultiZkException e) {
-      // Okay
-    }
-
-    // Check that all clusters appear as existing to this accessor
-    CLUSTER_LIST.forEach(cluster -> {
-      Assert.assertTrue(zkBasedAccessor.exists("/" + cluster, AccessOption.PERSISTENT));
-      Assert.assertTrue(httpZkFallbackBasedAccessor.exists("/" + cluster, AccessOption.PERSISTENT));
-    });
-
-    // Close all connections
-    accessor.close();
-    zkBasedAccessor.close();
-    httpZkFallbackBasedAccessor.close();
-
-    System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
-  }
-}
diff --git a/scripts/runSingleTest.sh b/scripts/runSingleTest.sh
index 094e07d09..69d8791cd 100755
--- a/scripts/runSingleTest.sh
+++ b/scripts/runSingleTest.sh
@@ -39,7 +39,7 @@ do
   echo "======================================================================
 Attempt $i $testName
 ======================================================================"
-  mvn test -o -Dtest=$testName -pl=$project
+  mvn -q test -o -Dtest=$testName -pl=$project
   exitcode=$?
   if [ $exitcode -ne 0 ]
   then