You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/07/05 21:38:05 UTC

[helix] branch master updated: Fix validation logic in GenericApiBuilders and add usage tests (#1127)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f012c2  Fix validation logic in GenericApiBuilders and add usage tests (#1127)
7f012c2 is described below

commit 7f012c265f12b86511ac52d31979fcadb5ef3294
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Sun Jul 5 14:37:54 2020 -0700

    Fix validation logic in GenericApiBuilders and add usage tests (#1127)
    
    Some of the validation logic in GenericBaseDataAccessorBuilder and GenericZkHelixApiBuilder were confusing and there were no usage tests. This commit reviews the consistency between validate methods of BaseDataAccessor APIs and writes test cases to demonstrate how they should be used.
    
    Also, connection timeout setting wasn't being honored in the createZkClient() methods, so this bug was fixed as well.
    
    createZkConnectionConfig() was removed from RealmAwareZkClient because the intended purpose of this method isn't clear and was not being used anywhere else.
---
 .../manager/zk/GenericBaseDataAccessorBuilder.java |  57 +++---
 .../helix/manager/zk/GenericZkHelixApiBuilder.java |  91 ++++++++-
 .../multizk/TestMultiZkHelixJavaApis.java          | 212 ++++++++++++++++++++-
 .../zookeeper/api/client/RealmAwareZkClient.java   |  28 +--
 .../helix/zookeeper/api/client/ZkClientType.java   |   6 +
 .../impl/factory/SharedZkClientFactory.java        |   2 +
 6 files changed, 330 insertions(+), 66 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/GenericBaseDataAccessorBuilder.java b/helix-core/src/main/java/org/apache/helix/manager/zk/GenericBaseDataAccessorBuilder.java
index 92a4df4..6845802 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/GenericBaseDataAccessorBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/GenericBaseDataAccessorBuilder.java
@@ -82,38 +82,28 @@ public class GenericBaseDataAccessorBuilder<B extends GenericBaseDataAccessorBui
     switch (realmMode) {
       case MULTI_REALM:
         try {
-          if (_zkClientType == ZkClientType.DEDICATED) {
-            // Use a realm-aware dedicated zk client
-            zkClient = DedicatedZkClientFactory.getInstance()
-                .buildZkClient(connectionConfig, clientConfig);
-          } else if (_zkClientType == ZkClientType.SHARED) {
-            // Use a realm-aware shared zk client
-            zkClient =
-                SharedZkClientFactory.getInstance().buildZkClient(connectionConfig, clientConfig);
-          } else {
-            zkClient = new FederatedZkClient(connectionConfig, clientConfig);
-          }
-        } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+          zkClient = new FederatedZkClient(connectionConfig, clientConfig);
+        } catch (IOException | InvalidRoutingDataException e) {
           throw new HelixException("Not able to connect on multi-realm mode.", e);
         }
         break;
-
       case SINGLE_REALM:
+        HelixZkClient.ZkConnectionConfig helixZkConnectionConfig =
+            new HelixZkClient.ZkConnectionConfig(zkAddress)
+                .setSessionTimeout(connectionConfig.getSessionTimeout());
         if (_zkClientType == ZkClientType.DEDICATED) {
           // If DEDICATED, then we use a dedicated HelixZkClient because we must support ephemeral
           // operations
           zkClient = DedicatedZkClientFactory.getInstance()
-              .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
-                  clientConfig.createHelixZkClientConfig());
+              .buildZkClient(helixZkConnectionConfig, clientConfig.createHelixZkClientConfig());
         } else {
-          // if SHARED: Use a SharedZkClient because ZkBaseDataAccessor does not need to
+          // if SHARED or null: Use a SharedZkClient because ZkBaseDataAccessor does not need to
           // do ephemeral operations.
           zkClient = SharedZkClientFactory.getInstance()
-              .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
-                  clientConfig.createHelixZkClientConfig());
-          zkClient
-              .waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+              .buildZkClient(helixZkConnectionConfig, clientConfig.createHelixZkClientConfig());
         }
+        zkClient
+            .waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
         break;
       default:
         throw new HelixException("Invalid RealmMode given: " + realmMode);
@@ -123,23 +113,28 @@ public class GenericBaseDataAccessorBuilder<B extends GenericBaseDataAccessorBui
 
   /**
    * Validate ZkClientType based on RealmMode.
+   * If ZkClientType is DEDICATED or SHARED, the realm mode must be SINGLE-REALM.
+   * If ZkClientType is FEDERATED, the realm mode must be MULTI-REALM.
    * @param zkClientType
    * @param realmMode
    */
   private void validateZkClientType(ZkClientType zkClientType,
       RealmAwareZkClient.RealmMode realmMode) {
-    boolean isZkClientTypeSet = zkClientType != null;
-    // If ZkClientType is set, RealmMode must either be single-realm or not set.
-    if (isZkClientTypeSet && realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM) {
-      throw new HelixException("ZkClientType cannot be set on multi-realm mode!");
-    }
-    // If ZkClientType is not set and realmMode is single-realm, default to SHARED
-    if (!isZkClientTypeSet && realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM) {
-      zkClientType = ZkClientType.SHARED;
+    if (realmMode == null) {
+      // NOTE: GenericZkHelixApiBuilder::validate() is and must be called before this function, so
+      // we could assume that realmMode will not be null. If it is, we throw an exception.
+      throw new HelixException(
+          "GenericBaseDataAccessorBuilder: Cannot validate ZkClient type! RealmMode is null!");
     }
-    if (realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM
-        && zkClientType == ZkClientType.FEDERATED) {
-      throw new HelixException("FederatedZkClient cannot be set on single-realm mode!");
+    // If ZkClientType is DEDICATED or SHARED, the realm mode cannot be multi-realm.
+    // If ZkClientType is FEDERATED, the realm mode cannot be single-realm.
+    if (((zkClientType == ZkClientType.DEDICATED || zkClientType == ZkClientType.SHARED)
+        && realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM) || (
+        zkClientType == ZkClientType.FEDERATED
+            && realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM)) {
+      throw new HelixException(
+          "Invalid combination of ZkClientType and RealmMode: ZkClientType is " + zkClientType
+              .name() + " and realmMode is " + realmMode.name());
     }
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/GenericZkHelixApiBuilder.java b/helix-core/src/main/java/org/apache/helix/manager/zk/GenericZkHelixApiBuilder.java
index f6015c4..89ee5a0 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/GenericZkHelixApiBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/GenericZkHelixApiBuilder.java
@@ -22,12 +22,16 @@ package org.apache.helix.manager.zk;
 import java.io.IOException;
 
 import org.apache.helix.HelixException;
+import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
 import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
 import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
+import org.apache.helix.zookeeper.util.HttpRoutingDataReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -36,11 +40,22 @@ import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
  * @param <B>
  */
 public abstract class GenericZkHelixApiBuilder<B extends GenericZkHelixApiBuilder<B>> {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(GenericZkHelixApiBuilder.class.getName());
+
   protected String _zkAddress;
   protected RealmAwareZkClient.RealmMode _realmMode;
   protected RealmAwareZkClient.RealmAwareZkConnectionConfig _realmAwareZkConnectionConfig;
   protected RealmAwareZkClient.RealmAwareZkClientConfig _realmAwareZkClientConfig;
 
+  /**
+   * Note: If you set the ZK address explicitly, this setting will take priority over the ZK path
+   * sharding key set in RealmAwareZkConnectionConfig.
+   * If this field is null, and the realm mode is single-realm, then it will try to look up the
+   * ZK address based on the ZK path sharding key.
+   * @param zkAddress
+   * @return
+   */
   public B setZkAddress(String zkAddress) {
     _zkAddress = zkAddress;
     return self();
@@ -65,22 +80,59 @@ public abstract class GenericZkHelixApiBuilder<B extends GenericZkHelixApiBuilde
 
   /**
    * Validates the given Builder parameters using a generic validation logic.
+   *
+   * This validation function checks whether realm mode has been set correctly, and
+   * resolves them if not set at all.
+   *
+   * If realm mode is null, we look at zkAddress field or RealmAwareZkClient's connection config to
+   * see if we could deduce the Zk address based on the MetadataStoreRoutingData.
+   *
+   * Note: If you set the ZK address explicitly, this setting will take priority over the ZK path
+   * sharding key set in RealmAwareZkConnectionConfig.
+   * If this field is null, and the realm mode is single-realm, then it will try to look up the
+   * ZK address based on the ZK path sharding key.
    */
   protected void validate() {
+    initializeConfigsIfNull();
+
     // Resolve RealmMode based on whether ZK address has been set
     boolean isZkAddressSet = _zkAddress != null && !_zkAddress.isEmpty();
+
+    // If realmMode is single-realm (in other words, ZkAddress is needed) and zk address is not
+    // given, then try to look up ZK address if ZK realm sharding key is set
+    // If realmMode is multi-realm, make sure it's not tied to a single sharding key
+    if (!isZkAddressSet && _realmAwareZkConnectionConfig.getZkRealmShardingKey() != null
+        && !_realmAwareZkConnectionConfig.getZkRealmShardingKey().isEmpty()) {
+      if (_realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM) {
+        // Try to resolve the zk address using the zk path sharding key if given
+        try {
+          _zkAddress = resolveZkAddressWithShardingKey(_realmAwareZkConnectionConfig);
+          isZkAddressSet = true;
+        } catch (IOException | InvalidRoutingDataException e) {
+          LOG.warn(
+              "GenericZkHelixApiBuilder: ZkAddress is not set and failed to resolve ZkAddress with ZK path sharding key!",
+              e);
+        }
+      } else if (_realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM) {
+        // Multi-realm and a single sharding key cannot coexist (by definition, multi-realm can access multiple sharding keys)
+        throw new HelixException(
+            "GenericZkHelixApiBuilder: Cannot have a ZK path sharding key in ConnectionConfig on multi-realm mode! Multi-realm accesses multiple sharding keys.");
+      }
+    }
+
     if (_realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM && !isZkAddressSet) {
-      throw new HelixException("RealmMode cannot be single-realm without a valid ZkAddress set!");
+      throw new HelixException(
+          "GenericZkHelixApiBuilder: RealmMode cannot be single-realm without a valid ZkAddress set!");
     }
     if (_realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM && isZkAddressSet) {
-      throw new HelixException("ZkAddress cannot be set on multi-realm mode!");
+      throw new HelixException(
+          "GenericZkHelixApiBuilder: ZkAddress cannot be set on multi-realm mode!");
     }
+
     if (_realmMode == null) {
       _realmMode = isZkAddressSet ? RealmAwareZkClient.RealmMode.SINGLE_REALM
           : RealmAwareZkClient.RealmMode.MULTI_REALM;
     }
-
-    initializeConfigsIfNull();
   }
 
   /**
@@ -114,16 +166,18 @@ public abstract class GenericZkHelixApiBuilder<B extends GenericZkHelixApiBuilde
           return new FederatedZkClient(connectionConfig,
               clientConfig.setZkSerializer(new ZNRecordSerializer()));
         } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
-          throw new HelixException("Failed to create FederatedZkClient!", e);
+          throw new HelixException("GenericZkHelixApiBuilder: Failed to create FederatedZkClient!",
+              e);
         }
       case SINGLE_REALM:
         // Create a HelixZkClient: Use a SharedZkClient because ClusterSetup does not need to do
         // ephemeral operations
-        return SharedZkClientFactory.getInstance()
-            .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
-                clientConfig.createHelixZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
+        return SharedZkClientFactory.getInstance().buildZkClient(
+            new HelixZkClient.ZkConnectionConfig(zkAddress)
+                .setSessionTimeout(connectionConfig.getSessionTimeout()),
+            clientConfig.createHelixZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
       default:
-        throw new HelixException("Invalid RealmMode given: " + realmMode);
+        throw new HelixException("GenericZkHelixApiBuilder: Invalid RealmMode given: " + realmMode);
     }
   }
 
@@ -136,4 +190,23 @@ public abstract class GenericZkHelixApiBuilder<B extends GenericZkHelixApiBuilde
   final B self() {
     return (B) this;
   }
+
+  /**
+   * Resolve Zk address based on the zk realm sharding key. This method is only used if the
+   * ZK address is not given in this Builder.
+   * @param connectionConfig
+   * @return
+   * @throws IOException
+   * @throws InvalidRoutingDataException
+   */
+  private String resolveZkAddressWithShardingKey(
+      RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig)
+      throws IOException, InvalidRoutingDataException {
+    boolean isMsdsEndpointSet =
+        connectionConfig.getMsdsEndpoint() != null && !connectionConfig.getMsdsEndpoint().isEmpty();
+    MetadataStoreRoutingData routingData = isMsdsEndpointSet ? HttpRoutingDataReader
+        .getMetadataStoreRoutingData(connectionConfig.getMsdsEndpoint())
+        : HttpRoutingDataReader.getMetadataStoreRoutingData();
+    return routingData.getMetadataStoreRealm(connectionConfig.getZkRealmShardingKey());
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
index 4395586..57f250e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
@@ -69,10 +69,12 @@ import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.api.client.ZkClientType;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
 import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
+import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
 import org.apache.helix.zookeeper.zkclient.ZkServer;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -182,6 +184,12 @@ public class TestMultiZkHelixJavaApis {
 
       // Stop MockMSDS
       _msds.stopServer();
+
+      // Close ZK client connections
+      _zkHelixAdmin.close();
+      if (_zkClient != null && !_zkClient.isClosed()) {
+        _zkClient.close();
+      }
     } finally {
       // Restore System property configs
       if (_configStore.containsKey(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
@@ -218,6 +226,9 @@ public class TestMultiZkHelixJavaApis {
 
     // Create clusters again to continue with testing
     createClusters(clusterSetupBuilder);
+
+    clusterSetupZkAddr.close();
+    clusterSetupBuilder.close();
   }
 
   private void createClusters(ClusterSetup clusterSetup) {
@@ -255,7 +266,7 @@ public class TestMultiZkHelixJavaApis {
     // Note: ZK Address here could be anything because multiZk mode is on (it will be ignored)
     HelixAdmin helixAdminZkAddr = new ZKHelixAdmin(ZK_SERVER_MAP.keySet().iterator().next());
     HelixAdmin helixAdminBuilder = new ZKHelixAdmin.Builder().build();
-    _zkHelixAdmin = helixAdminBuilder;
+    _zkHelixAdmin = new ZKHelixAdmin.Builder().build();
 
     String participantNamePrefix = "Node_";
     int numParticipants = 5;
@@ -294,6 +305,9 @@ public class TestMultiZkHelixJavaApis {
           .verify(() -> helixAdminBuilder.getInstancesInCluster(cluster).size() == numParticipants,
               TestHelper.WAIT_DURATION));
     }
+
+    helixAdminZkAddr.close();
+    helixAdminBuilder.close();
   }
 
   private void createParticipantsAndVerify(HelixAdmin admin, int numParticipants,
@@ -403,6 +417,9 @@ public class TestMultiZkHelixJavaApis {
             .assertEquals(is.getSimpleFields(), savedIdealStates.get(is.getId()).getSimpleFields());
       });
     }
+
+    dataAccessorZkAddr.close();
+    dataAccessorBuilder.close();
   }
 
   /**
@@ -418,6 +435,9 @@ public class TestMultiZkHelixJavaApis {
 
     setClusterConfigAndVerify(configAccessorZkAddr);
     setClusterConfigAndVerify(configAccessorBuilder);
+
+    configAccessorZkAddr.close();
+    configAccessorBuilder.close();
   }
 
   private void setClusterConfigAndVerify(ConfigAccessor configAccessorMultiZk) {
@@ -493,6 +513,194 @@ public class TestMultiZkHelixJavaApis {
   }
 
   /**
+   * This method tests that GenericBaseDataAccessorBuilder and GenericZkHelixApiBuilder work as
+   * expected. This test focuses on various usage scenarios for ZkBaseDataAccessor.
+   *
+   * Use cases tested:
+   * - Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, ZK address set
+   * - Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, ZK address not set, ZK sharding key set
+   * - Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, ZK address set, ZK sharding key set (ZK addr should override)
+   * - Create ZkBaseDataAccessor, single-realm, sharedZkClient, ZK address set
+   * - Create ZkBaseDataAccessor, single-realm, sharedZkClient, ZK address not set, ZK sharding key set
+   * - Create ZkBaseDataAccessor, single-realm, sharedZkClient, ZK address set, ZK sharding key set (ZK addr should override)
+   * - Create ZkBaseDataAccessor, single-realm, federated ZkClient (should fail)
+   * - Create ZkBaseDataAccessor, multi-realm, dedicated ZkClient (should fail)
+   * - Create ZkBaseDataAccessor, multi-realm, shared ZkClient (should fail)
+   * - Create ZkBaseDataAccessor, multi-realm, federated ZkClient, ZkAddress set (should fail)
+   * - Create ZkBaseDataAccessor, multi-realm, federated ZkClient, Zk sharding key set (should fail because by definition, multi-realm can access multiple sharding keys)
+   * - Create ZkBaseDataAccessor, multi-realm, federated ZkClient
+   * - Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, No ZkAddress set, ConnectionConfig has an invalid ZK sharding key (should fail because it cannot find a valid ZK to connect to)
+   */
+  @Test(dependsOnMethods = "testGetAllClusters")
+  public void testGenericBaseDataAccessorBuilder() {
+    // Custom session timeout value is used to count active connections in SharedZkClientFactory
+    int customSessionTimeout = 10000;
+    String firstZkAddress = "localhost:8777"; // has "CLUSTER_1"
+    String firstClusterPath = "/CLUSTER_1";
+    String secondClusterPath = "/CLUSTER_2";
+    ZkBaseDataAccessor.Builder<ZNRecord> zkBaseDataAccessorBuilder =
+        new ZkBaseDataAccessor.Builder<>();
+    RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
+        new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
+    connectionConfigBuilder.setSessionTimeout(customSessionTimeout);
+    BaseDataAccessor<ZNRecord> accessor;
+
+    // Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, ZK address set
+    int currentSharedZkClientActiveConnectionCount =
+        SharedZkClientFactory.getInstance().getActiveConnectionCount();
+    accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+        .setZkClientType(ZkClientType.DEDICATED).setZkAddress(firstZkAddress)
+        .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+    Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
+    Assert.assertFalse(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
+    // Check that no extra connection has been created
+    Assert.assertEquals(SharedZkClientFactory.getInstance().getActiveConnectionCount(),
+        currentSharedZkClientActiveConnectionCount);
+    accessor.close();
+
+    // Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, ZK address not set, ZK sharding key set
+    connectionConfigBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+        .setZkRealmShardingKey(firstClusterPath);
+    accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+        .setZkClientType(ZkClientType.DEDICATED).setZkAddress(null)
+        .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+    Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
+    Assert.assertFalse(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
+    Assert.assertEquals(SharedZkClientFactory.getInstance().getActiveConnectionCount(),
+        currentSharedZkClientActiveConnectionCount);
+    accessor.close();
+
+    // Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, ZK address set, ZK sharding key set (ZK addr should override)
+    connectionConfigBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+        .setZkRealmShardingKey(secondClusterPath);
+    accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+        .setZkClientType(ZkClientType.DEDICATED).setZkAddress(firstZkAddress)
+        .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+    Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
+    Assert.assertFalse(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
+    Assert.assertEquals(SharedZkClientFactory.getInstance().getActiveConnectionCount(),
+        currentSharedZkClientActiveConnectionCount);
+    accessor.close();
+
+    // Create ZkBaseDataAccessor, single-realm, sharedZkClient, ZK address set
+    currentSharedZkClientActiveConnectionCount =
+        SharedZkClientFactory.getInstance().getActiveConnectionCount();
+    connectionConfigBuilder.setZkRealmShardingKey(null).setRealmMode(null);
+    accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+        .setZkClientType(ZkClientType.SHARED).setZkAddress(firstZkAddress)
+        .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+    Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
+    Assert.assertFalse(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
+    // Add one to active connection count since this is a shared ZkClientType
+    Assert.assertEquals(SharedZkClientFactory.getInstance().getActiveConnectionCount(),
+        currentSharedZkClientActiveConnectionCount + 1);
+    accessor.close();
+
+    // Create ZkBaseDataAccessor, single-realm, sharedZkClient, ZK address not set, ZK sharding key set
+    connectionConfigBuilder.setZkRealmShardingKey(firstClusterPath)
+        .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM);
+    accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+        .setZkClientType(ZkClientType.SHARED).setZkAddress(null)
+        .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+    Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
+    Assert.assertFalse(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
+    // Add one to active connection count since this is a shared ZkClientType
+    Assert.assertEquals(SharedZkClientFactory.getInstance().getActiveConnectionCount(),
+        currentSharedZkClientActiveConnectionCount + 1);
+    accessor.close();
+
+    // Create ZkBaseDataAccessor, single-realm, sharedZkClient, ZK address set, ZK sharding key set
+    // (ZK address should override the sharding key setting)
+    connectionConfigBuilder.setZkRealmShardingKey(secondClusterPath)
+        .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM);
+    accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+        .setZkClientType(ZkClientType.SHARED).setZkAddress(firstZkAddress)
+        .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+    Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
+    Assert.assertFalse(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
+    // Add one to active connection count since this is a shared ZkClientType
+    Assert.assertEquals(SharedZkClientFactory.getInstance().getActiveConnectionCount(),
+        currentSharedZkClientActiveConnectionCount + 1);
+    accessor.close();
+
+    // Create ZkBaseDataAccessor, single-realm, federated ZkClient (should fail)
+    connectionConfigBuilder.setZkRealmShardingKey(null).setRealmMode(null);
+    try {
+      accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+          .setZkClientType(ZkClientType.FEDERATED).setZkAddress(firstZkAddress)
+          .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+      Assert.fail("SINGLE_REALM and FEDERATED ZkClientType are an invalid combination!");
+    } catch (HelixException e) {
+      // Expected
+    }
+
+    // Create ZkBaseDataAccessor, multi-realm, dedicated ZkClient (should fail)
+    try {
+      accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.MULTI_REALM)
+          .setZkClientType(ZkClientType.DEDICATED).setZkAddress(firstZkAddress)
+          .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+      Assert.fail("MULTI_REALM and DEDICATED ZkClientType are an invalid combination!");
+    } catch (HelixException e) {
+      // Expected
+    }
+
+    // Create ZkBaseDataAccessor, multi-realm, shared ZkClient (should fail)
+    try {
+      accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.MULTI_REALM)
+          .setZkClientType(ZkClientType.SHARED).setZkAddress(firstZkAddress)
+          .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+      Assert.fail("MULTI_REALM and SHARED ZkClientType are an invalid combination!");
+    } catch (HelixException e) {
+      // Expected
+    }
+
+    // Create ZkBaseDataAccessor, multi-realm, federated ZkClient, ZkAddress set (should fail)
+    try {
+      accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.MULTI_REALM)
+          .setZkClientType(ZkClientType.FEDERATED).setZkAddress(firstZkAddress)
+          .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+      Assert.fail("MULTI_REALM and FEDERATED ZkClientType do not connect to one ZK!");
+    } catch (HelixException e) {
+      // Expected
+    }
+
+    // Create ZkBaseDataAccessor, multi-realm, federated ZkClient, Zk sharding key set (should fail
+    // because by definition, multi-realm can access multiple sharding keys)
+    try {
+      connectionConfigBuilder.setZkRealmShardingKey(firstClusterPath)
+          .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM);
+      accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.MULTI_REALM)
+          .setZkClientType(ZkClientType.FEDERATED).setZkAddress(null)
+          .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+      Assert.fail("MULTI_REALM and FEDERATED ZkClientType do not connect to one ZK!");
+    } catch (HelixException e) {
+      // Expected
+    }
+
+    // Create ZkBaseDataAccessor, multi-realm, federated ZkClient
+    connectionConfigBuilder.setZkRealmShardingKey(null).setRealmMode(null);
+    accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.MULTI_REALM)
+        .setZkClientType(ZkClientType.FEDERATED).setZkAddress(null)
+        .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+    Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
+    Assert.assertTrue(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
+    accessor.close();
+
+    // Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, No ZkAddress set,
+    // ConnectionConfig has an invalid ZK sharding key (should fail because it cannot find a valid
+    // ZK to connect to)
+    connectionConfigBuilder.setZkRealmShardingKey("/NonexistentShardingKey");
+    try {
+      accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+          .setZkClientType(ZkClientType.DEDICATED).setZkAddress(null)
+          .setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
+      Assert.fail("Should fail because it cannot find a valid ZK to connect to!");
+    } catch (NoSuchElementException e) {
+      // Expected because the sharding key wouldn't be found
+    }
+  }
+
+  /**
    * Tests Helix Java APIs which use different MSDS endpoint configs. Java API should
    * only connect to the configured MSDS but not the others. The APIs are explicitly tested are:
    * - ClusterSetup
@@ -502,7 +710,7 @@ public class TestMultiZkHelixJavaApis {
    * - BaseDataAccessor
    * - ConfigAccessor
    */
-  @Test(dependsOnMethods = "testGetAllClusters")
+  @Test(dependsOnMethods = "testGenericBaseDataAccessorBuilder")
   public void testDifferentMsdsEndpointConfigs() throws IOException, InvalidRoutingDataException {
     String methodName = TestHelper.getTestMethodName();
     System.out.println("Start " + methodName);
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
index 84f52b9..a777fd0 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
@@ -383,7 +383,7 @@ public interface RealmAwareZkClient {
      */
     private String _zkRealmShardingKey;
     private String _msdsEndpoint;
-    private int _sessionTimeout;
+    private int _sessionTimeout = DEFAULT_SESSION_TIMEOUT;
 
     private RealmAwareZkConnectionConfig(Builder builder) {
       _zkRealmShardingKey = builder._zkRealmShardingKey;
@@ -432,27 +432,6 @@ public interface RealmAwareZkClient {
       return _msdsEndpoint;
     }
 
-    public HelixZkClient.ZkConnectionConfig createZkConnectionConfig()
-        throws IOException, InvalidRoutingDataException {
-      // Convert to a single-realm HelixZkClient's ZkConnectionConfig
-      if (_zkRealmShardingKey == null || _zkRealmShardingKey.isEmpty()) {
-        throw new ZkClientException(
-            "Cannot create ZkConnectionConfig because ZK realm sharding key is either null or empty!");
-      }
-
-      String zkAddress;
-      // Look up the ZK address for the given ZK realm sharding key
-      if (_msdsEndpoint == null || _msdsEndpoint.isEmpty()) {
-        zkAddress = HttpRoutingDataReader.getMetadataStoreRoutingData()
-            .getMetadataStoreRealm(_zkRealmShardingKey);
-      } else {
-        zkAddress = HttpRoutingDataReader.getMetadataStoreRoutingData(_msdsEndpoint)
-            .getMetadataStoreRealm(_zkRealmShardingKey);
-      }
-
-      return new HelixZkClient.ZkConnectionConfig(zkAddress).setSessionTimeout(_sessionTimeout);
-    }
-
     public static class Builder {
       private RealmMode _realmMode;
       private String _zkRealmShardingKey;
@@ -500,11 +479,12 @@ public interface RealmAwareZkClient {
         boolean isShardingKeySet = _zkRealmShardingKey != null && !_zkRealmShardingKey.isEmpty();
         if (_realmMode == RealmMode.MULTI_REALM && isShardingKeySet) {
           throw new IllegalArgumentException(
-              "ZK sharding key cannot be set on multi-realm mode! Sharding key: "
+              "RealmAwareZkConnectionConfig.Builder: ZK sharding key cannot be set on multi-realm mode! Sharding key: "
                   + _zkRealmShardingKey);
         }
         if (_realmMode == RealmMode.SINGLE_REALM && !isShardingKeySet) {
-          throw new IllegalArgumentException("ZK sharding key must be set on single-realm mode!");
+          throw new IllegalArgumentException(
+              "RealmAwareZkConnectionConfig.Builder: ZK sharding key must be set on single-realm mode!");
         }
       }
     }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/ZkClientType.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/ZkClientType.java
index 4db06a8..3a1f917 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/ZkClientType.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/ZkClientType.java
@@ -25,18 +25,24 @@ public enum ZkClientType {
    * creation, callback functionality, and session management. But note that this is more
    * resource-heavy since it creates a dedicated ZK connection so should be used sparingly only
    * when the aforementioned features are needed.
+   *
+   * Valid on SINGLE_REALM only.
    */
   DEDICATED,
 
   /**
    * If a Helix API is created with the SHARED type, it only supports CRUD
    * functionalities. This will be the default mode of creation.
+   *
+   * Valid on SINGLE_REALM only.
    */
   SHARED,
 
   /**
    * Uses FederatedZkClient (applicable on multi-realm mode only) that queries Metadata Store
    * Directory Service for routing data.
+   *
+   * Valid on MULTI_REALM only.
    */
   FEDERATED
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java
index fb46ef2..e6199ed 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
@@ -121,6 +122,7 @@ public class SharedZkClientFactory extends HelixZkClientFactory {
   }
 
   // For testing purposes only
+  @VisibleForTesting
   public int getActiveConnectionCount() {
     int count = 0;
     synchronized (_connectionManagerPool) {