You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/04/08 22:54:10 UTC

[helix] 40/50: Reformat ZkBaseDataAccessor (#893)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch zooscalability_merge
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 8adaf98fd216597a22dbbbae1ff9116734f3cd95
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Fri Mar 13 17:19:34 2020 -0700

    Reformat ZkBaseDataAccessor (#893)
    
    Changelist:
    1. Add generic type markers to Builder (<T>)
    2. Fix a bug in validate function
    3. Default to ZNRecordSerializer to preserve existing behavior
---
 .../helix/manager/zk/ZkBaseDataAccessor.java       | 301 +++++++++++----------
 1 file changed, 159 insertions(+), 142 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index 1d60c7b..32f33f8 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -52,6 +52,7 @@ import org.apache.helix.zookeeper.zkclient.exception.ZkException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
 import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.data.Stat;
@@ -59,6 +60,7 @@ import org.apache.zookeeper.server.DataTree;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
 
   // Designates which mode ZkBaseDataAccessor should be created in. If not specified, it will be
@@ -139,7 +141,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     _usesExternalZkClient = true;
   }
 
-  private ZkBaseDataAccessor(Builder builder) {
+  private ZkBaseDataAccessor(Builder<T> builder) {
     switch (builder.realmMode) {
       case MULTI_REALM:
         try {
@@ -397,16 +399,16 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
           result._pathCreated.addAll(res._pathCreated);
           RetCode rc = res._retCode;
           switch (rc) {
-          case OK:
-            // not set stat if node is created (instead of set)
-            break;
-          case NODE_EXISTS:
-            retry = true;
-            break;
-          default:
-            LOG.error("Fail to set path by creating: " + path);
-            result._retCode = RetCode.ERROR;
-            return result;
+            case OK:
+              // not set stat if node is created (instead of set)
+              break;
+            case NODE_EXISTS:
+              retry = true;
+              break;
+            default:
+              LOG.error("Fail to set path by creating: " + path);
+              result._retCode = RetCode.ERROR;
+              return result;
           }
         } catch (Exception e1) {
           LOG.error("Exception while setting path by creating: " + path, e);
@@ -477,16 +479,16 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
             rc = RetCode.OK;
           }
           switch (rc) {
-          case OK:
-            updatedData = newData;
-            break;
-          case NODE_EXISTS:
-            retry = true;
-            break;
-          default:
-            LOG.error("Fail to update path by creating: " + path);
-            result._retCode = RetCode.ERROR;
-            return result;
+            case OK:
+              updatedData = newData;
+              break;
+            case NODE_EXISTS:
+              retry = true;
+              break;
+            default:
+              LOG.error("Fail to update path by creating: " + path);
+              result._retCode = RetCode.ERROR;
+              return result;
           }
         } catch (Exception e1) {
           LOG.error("Exception while updating path by creating: " + path, e1);
@@ -553,17 +555,19 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     // init stats
     if (stats != null) {
       stats.clear();
-      stats.addAll(Collections.<Stat> nCopies(paths.size(), null));
+      stats.addAll(Collections.<Stat>nCopies(paths.size(), null));
     }
 
     long startT = System.nanoTime();
 
     try {
       // issue asyn get requests
-      ZkAsyncCallbacks.GetDataCallbackHandler[] cbList = new ZkAsyncCallbacks.GetDataCallbackHandler[paths.size()];
+      ZkAsyncCallbacks.GetDataCallbackHandler[] cbList =
+          new ZkAsyncCallbacks.GetDataCallbackHandler[paths.size()];
       for (int i = 0; i < paths.size(); i++) {
-        if (!needRead[i])
+        if (!needRead[i]) {
           continue;
+        }
 
         String path = paths.get(i);
         cbList[i] = new ZkAsyncCallbacks.GetDataCallbackHandler();
@@ -572,19 +576,21 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
 
       // wait for completion
       for (int i = 0; i < cbList.length; i++) {
-        if (!needRead[i])
+        if (!needRead[i]) {
           continue;
+        }
 
         ZkAsyncCallbacks.GetDataCallbackHandler cb = cbList[i];
         cb.waitForSuccess();
       }
 
       // construct return results
-      List<T> records = new ArrayList<T>(Collections.<T> nCopies(paths.size(), null));
+      List<T> records = new ArrayList<T>(Collections.<T>nCopies(paths.size(), null));
       Map<String, Integer> pathFailToRead = new HashMap<>();
       for (int i = 0; i < paths.size(); i++) {
-        if (!needRead[i])
+        if (!needRead[i]) {
           continue;
+        }
 
         ZkAsyncCallbacks.GetDataCallbackHandler cb = cbList[i];
         if (Code.get(cb.getRc()) == Code.OK) {
@@ -610,8 +616,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     } finally {
       long endT = System.nanoTime();
       if (LOG.isTraceEnabled()) {
-        LOG.trace("getData_async, size: " + paths.size() + ", paths: " + paths.get(0)
-            + ",... time: " + (endT - startT) + " ns");
+        LOG.trace(
+            "getData_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: " + (
+                endT - startT) + " ns");
       }
     }
   }
@@ -752,15 +759,16 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
   /**
    * async create. give up on error other than NONODE
    */
-  ZkAsyncCallbacks.CreateCallbackHandler[] create(List<String> paths, List<T> records, boolean[] needCreate,
-      List<List<String>> pathsCreated, int options) {
+  ZkAsyncCallbacks.CreateCallbackHandler[] create(List<String> paths, List<T> records,
+      boolean[] needCreate, List<List<String>> pathsCreated, int options) {
     if ((records != null && records.size() != paths.size()) || needCreate.length != paths.size()
         || (pathsCreated != null && pathsCreated.size() != paths.size())) {
       throw new IllegalArgumentException(
           "paths, records, needCreate, and pathsCreated should be of same size");
     }
 
-    ZkAsyncCallbacks.CreateCallbackHandler[] cbList = new ZkAsyncCallbacks.CreateCallbackHandler[paths.size()];
+    ZkAsyncCallbacks.CreateCallbackHandler[] cbList =
+        new ZkAsyncCallbacks.CreateCallbackHandler[paths.size()];
 
     CreateMode mode = AccessOption.getMode(options);
     if (mode == null) {
@@ -773,8 +781,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
       retry = false;
 
       for (int i = 0; i < paths.size(); i++) {
-        if (!needCreate[i])
+        if (!needCreate[i]) {
           continue;
+        }
 
         String path = paths.get(i);
         T record = records == null ? null : records.get(i);
@@ -782,12 +791,13 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
         _zkClient.asyncCreate(path, record, mode, cbList[i]);
       }
 
-      List<String> parentPaths = new ArrayList<>(Collections.<String> nCopies(paths.size(), null));
+      List<String> parentPaths = new ArrayList<>(Collections.<String>nCopies(paths.size(), null));
       boolean failOnNoNode = false;
 
       for (int i = 0; i < paths.size(); i++) {
-        if (!needCreate[i])
+        if (!needCreate[i]) {
           continue;
+        }
 
         ZkAsyncCallbacks.CreateCallbackHandler cb = cbList[i];
         cb.waitForSuccess();
@@ -819,8 +829,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
             create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.PERSISTENT);
         for (int i = 0; i < parentCbList.length; i++) {
           ZkAsyncCallbacks.CreateCallbackHandler parentCb = parentCbList[i];
-          if (parentCb == null)
+          if (parentCb == null) {
             continue;
+          }
 
           Code rc = Code.get(parentCb.getRc());
 
@@ -853,12 +864,13 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     boolean[] needCreate = new boolean[paths.size()];
     Arrays.fill(needCreate, true);
     List<List<String>> pathsCreated =
-        new ArrayList<>(Collections.<List<String>> nCopies(paths.size(), null));
+        new ArrayList<>(Collections.<List<String>>nCopies(paths.size(), null));
 
     long startT = System.nanoTime();
     try {
 
-      ZkAsyncCallbacks.CreateCallbackHandler[] cbList = create(paths, records, needCreate, pathsCreated, options);
+      ZkAsyncCallbacks.CreateCallbackHandler[] cbList =
+          create(paths, records, needCreate, pathsCreated, options);
 
       for (int i = 0; i < cbList.length; i++) {
         ZkAsyncCallbacks.CreateCallbackHandler cb = cbList[i];
@@ -866,12 +878,12 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
       }
 
       return success;
-
     } finally {
       long endT = System.nanoTime();
       if (LOG.isTraceEnabled()) {
-        LOG.trace("create_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: "
-            + (endT - startT) + " ns");
+        LOG.trace(
+            "create_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: " + (
+                endT - startT) + " ns");
       }
     }
   }
@@ -894,8 +906,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
       return new boolean[0];
     }
 
-    if ((records != null && records.size() != paths.size())
-        || (pathsCreated != null && pathsCreated.size() != paths.size())) {
+    if ((records != null && records.size() != paths.size()) || (pathsCreated != null
+        && pathsCreated.size() != paths.size())) {
       throw new IllegalArgumentException("paths, records, and pathsCreated should be of same size");
     }
 
@@ -907,8 +919,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
       return success;
     }
 
-    List<Stat> setStats = new ArrayList<>(Collections.<Stat> nCopies(paths.size(), null));
-    ZkAsyncCallbacks.SetDataCallbackHandler[] cbList = new ZkAsyncCallbacks.SetDataCallbackHandler[paths.size()];
+    List<Stat> setStats = new ArrayList<>(Collections.<Stat>nCopies(paths.size(), null));
+    ZkAsyncCallbacks.SetDataCallbackHandler[] cbList =
+        new ZkAsyncCallbacks.SetDataCallbackHandler[paths.size()];
     ZkAsyncCallbacks.CreateCallbackHandler[] createCbList = null;
     boolean[] needSet = new boolean[paths.size()];
     Arrays.fill(needSet, true);
@@ -921,14 +934,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
         retry = false;
 
         for (int i = 0; i < paths.size(); i++) {
-          if (!needSet[i])
+          if (!needSet[i]) {
             continue;
+          }
 
           String path = paths.get(i);
           T record = records.get(i);
           cbList[i] = new ZkAsyncCallbacks.SetDataCallbackHandler();
           _zkClient.asyncSetData(path, record, -1, cbList[i]);
-
         }
 
         boolean failOnNoNode = false;
@@ -938,18 +951,18 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
           cb.waitForSuccess();
           Code rc = Code.get(cb.getRc());
           switch (rc) {
-          case OK:
-            setStats.set(i, cb.getStat());
-            needSet[i] = false;
-            break;
-          case NONODE:
-            // if fail on NoNode, try create the node
-            failOnNoNode = true;
-            break;
-          default:
-            // if fail on error other than NoNode, give up
-            needSet[i] = false;
-            break;
+            case OK:
+              setStats.set(i, cb.getStat());
+              needSet[i] = false;
+              break;
+            case NONODE:
+              // if fail on NoNode, try create the node
+              failOnNoNode = true;
+              break;
+            default:
+              // if fail on error other than NoNode, give up
+              needSet[i] = false;
+              break;
           }
         }
 
@@ -965,18 +978,18 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
 
             Code rc = Code.get(createCb.getRc());
             switch (rc) {
-            case OK:
-              setStats.set(i, ZNode.ZERO_STAT);
-              needSet[i] = false;
-              break;
-            case NODEEXISTS:
-              retry = true;
-              break;
-            default:
-              // if creation fails on error other than NodeExists
-              // no need to retry set
-              needSet[i] = false;
-              break;
+              case OK:
+                setStats.set(i, ZNode.ZERO_STAT);
+                needSet[i] = false;
+                break;
+              case NODEEXISTS:
+                retry = true;
+                break;
+              default:
+                // if creation fails on error other than NodeExists
+                // no need to retry set
+                needSet[i] = false;
+                break;
             }
           }
         }
@@ -1006,13 +1019,15 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     } finally {
       long endT = System.nanoTime();
       if (LOG.isTraceEnabled()) {
-        LOG.trace("setData_async, size: " + paths.size() + ", paths: " + paths.get(0)
-            + ",... time: " + (endT - startT) + " ns");
+        LOG.trace(
+            "setData_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: " + (
+                endT - startT) + " ns");
       }
     }
   }
 
   // TODO: rename to update
+
   /**
    * async update
    */
@@ -1039,14 +1054,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
       return Collections.emptyList();
     }
 
-    if (updaters.size() != paths.size()
-        || (pathsCreated != null && pathsCreated.size() != paths.size())) {
+    if (updaters.size() != paths.size() || (pathsCreated != null && pathsCreated.size() != paths
+        .size())) {
       throw new IllegalArgumentException(
           "paths, updaters, and pathsCreated should be of same size");
     }
 
-    List<Stat> setStats = new ArrayList<Stat>(Collections.<Stat> nCopies(paths.size(), null));
-    List<T> updateData = new ArrayList<T>(Collections.<T> nCopies(paths.size(), null));
+    List<Stat> setStats = new ArrayList<Stat>(Collections.<Stat>nCopies(paths.size(), null));
+    List<T> updateData = new ArrayList<T>(Collections.<T>nCopies(paths.size(), null));
 
     CreateMode mode = AccessOption.getMode(options);
     if (mode == null) {
@@ -1054,7 +1069,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
       return updateData;
     }
 
-    ZkAsyncCallbacks.SetDataCallbackHandler[] cbList = new ZkAsyncCallbacks.SetDataCallbackHandler[paths.size()];
+    ZkAsyncCallbacks.SetDataCallbackHandler[] cbList =
+        new ZkAsyncCallbacks.SetDataCallbackHandler[paths.size()];
     ZkAsyncCallbacks.CreateCallbackHandler[] createCbList = null;
     boolean[] needUpdate = new boolean[paths.size()];
     Arrays.fill(needUpdate, true);
@@ -1104,29 +1120,30 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
 
         for (int i = 0; i < paths.size(); i++) {
           ZkAsyncCallbacks.SetDataCallbackHandler cb = cbList[i];
-          if (cb == null)
+          if (cb == null) {
             continue;
+          }
 
           cb.waitForSuccess();
 
           switch (Code.get(cb.getRc())) {
-          case OK:
-            updateData.set(i, newDataList.get(i));
-            setStats.set(i, cb.getStat());
-            needUpdate[i] = false;
-            break;
-          case NONODE:
-            failOnNoNode = true;
-            needCreate[i] = true;
-            break;
-          case BADVERSION:
-            failOnBadVersion = true;
-            break;
-          default:
-            // if fail on error other than NoNode or BadVersion
-            // will not retry
-            needUpdate[i] = false;
-            break;
+            case OK:
+              updateData.set(i, newDataList.get(i));
+              setStats.set(i, cb.getStat());
+              needUpdate[i] = false;
+              break;
+            case NONODE:
+              failOnNoNode = true;
+              needCreate[i] = true;
+              break;
+            case BADVERSION:
+              failOnBadVersion = true;
+              break;
+            default:
+              // if fail on error other than NoNode or BadVersion
+              // will not retry
+              needUpdate[i] = false;
+              break;
           }
         }
 
@@ -1140,19 +1157,19 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
             }
 
             switch (Code.get(createCb.getRc())) {
-            case OK:
-              needUpdate[i] = false;
-              updateData.set(i, newDataList.get(i));
-              setStats.set(i, ZNode.ZERO_STAT);
-              break;
-            case NODEEXISTS:
-              retry = true;
-              break;
-            default:
-              // if fail on error other than NodeExists
-              // will not retry
-              needUpdate[i] = false;
-              break;
+              case OK:
+                needUpdate[i] = false;
+                updateData.set(i, newDataList.get(i));
+                setStats.set(i, ZNode.ZERO_STAT);
+                break;
+              case NODEEXISTS:
+                retry = true;
+                break;
+              default:
+                // if fail on error other than NodeExists
+                // will not retry
+                needUpdate[i] = false;
+                break;
             }
           }
         }
@@ -1172,11 +1189,11 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     } finally {
       long endT = System.nanoTime();
       if (LOG.isTraceEnabled()) {
-        LOG.trace("setData_async, size: " + paths.size() + ", paths: " + paths.get(0)
-            + ",... time: " + (endT - startT) + " ns");
+        LOG.trace(
+            "setData_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: " + (
+                endT - startT) + " ns");
       }
     }
-
   }
 
   /**
@@ -1209,7 +1226,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     long startT = System.nanoTime();
 
     try {
-      ZkAsyncCallbacks.ExistsCallbackHandler[] cbList = new ZkAsyncCallbacks.ExistsCallbackHandler[paths.size()];
+      ZkAsyncCallbacks.ExistsCallbackHandler[] cbList =
+          new ZkAsyncCallbacks.ExistsCallbackHandler[paths.size()];
       for (int i = 0; i < paths.size(); i++) {
         String path = paths.get(i);
         cbList[i] = new ZkAsyncCallbacks.ExistsCallbackHandler();
@@ -1226,8 +1244,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     } finally {
       long endT = System.nanoTime();
       if (LOG.isTraceEnabled()) {
-        LOG.trace("exists_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: "
-            + (endT - startT) + " ns");
+        LOG.trace(
+            "exists_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: " + (
+                endT - startT) + " ns");
       }
     }
   }
@@ -1243,7 +1262,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
 
     boolean[] success = new boolean[paths.size()];
 
-    ZkAsyncCallbacks.DeleteCallbackHandler[] cbList = new ZkAsyncCallbacks.DeleteCallbackHandler[paths.size()];
+    ZkAsyncCallbacks.DeleteCallbackHandler[] cbList =
+        new ZkAsyncCallbacks.DeleteCallbackHandler[paths.size()];
 
     long startT = System.nanoTime();
 
@@ -1264,8 +1284,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     } finally {
       long endT = System.nanoTime();
       if (LOG.isTraceEnabled()) {
-        LOG.trace("delete_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: "
-            + (endT - startT) + " ns");
+        LOG.trace(
+            "delete_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: " + (
+                endT - startT) + " ns");
       }
     }
   }
@@ -1321,39 +1342,38 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
   }
 
   // TODO: refactor Builder class to remove duplicate code with other Helix Java APIs
-  public static class Builder {
+  public static class Builder<T> {
     private String zkAddress;
-    private RealmAwareZkClient.RealmMode realmMode;
     private ZkBaseDataAccessor.ZkClientType zkClientType;
+    private RealmAwareZkClient.RealmMode realmMode;
     private RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig;
     private RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig;
 
     public Builder() {
     }
 
-    public ZkBaseDataAccessor.Builder setZkAddress(String zkAddress) {
+    public Builder<T> setZkAddress(String zkAddress) {
       this.zkAddress = zkAddress;
       return this;
     }
 
-    public ZkBaseDataAccessor.Builder setRealmMode(RealmAwareZkClient.RealmMode realmMode) {
+    public Builder<T> setRealmMode(RealmAwareZkClient.RealmMode realmMode) {
       this.realmMode = realmMode;
       return this;
     }
 
-    public ZkBaseDataAccessor.Builder setZkClientType(
-        ZkBaseDataAccessor.ZkClientType zkClientType) {
+    public Builder<T> setZkClientType(ZkClientType zkClientType) {
       this.zkClientType = zkClientType;
       return this;
     }
 
-    public ZkBaseDataAccessor.Builder setRealmAwareZkConnectionConfig(
+    public Builder<T> setRealmAwareZkConnectionConfig(
         RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig) {
       this.realmAwareZkConnectionConfig = realmAwareZkConnectionConfig;
       return this;
     }
 
-    public ZkBaseDataAccessor.Builder setRealmAwareZkClientConfig(
+    public Builder<T> setRealmAwareZkClientConfig(
         RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig) {
       this.realmAwareZkClientConfig = realmAwareZkClientConfig;
       return this;
@@ -1362,11 +1382,11 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     /**
      * Returns a <code>ZkBaseDataAccessor</code> instance.
      * <p>
-     * Note: in multi-realm mode, if and only if ZK client type is set to <code>FEDERATED</code>,
-     * <code>ZkBaseDataAccessor</code> can access to multi-realm. Otherwise, it can only access to
-     * single-ream.
+     * Note: ZK client type must be set to <code>FEDERATED</code> in order for
+     * <code>ZkBaseDataAccessor</code> can access multiple ZKs. Otherwise, it can only access
+     * single-ZK.
      */
-    public ZkBaseDataAccessor<?> build() {
+    public ZkBaseDataAccessor<T> build() {
       validate();
       return new ZkBaseDataAccessor<>(this);
     }
@@ -1381,28 +1401,24 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
 
       // If ZkClientType is set, RealmMode must either be single-realm or not set.
       if (isZkClientTypeSet && realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM) {
-        throw new HelixException(
-            "ZkClientType cannot be set on multi-realm mode!");
+        throw new HelixException("ZkClientType cannot be set on multi-realm mode!");
       }
-      // If ZkClientType is not set, default to SHARED
-      if (!isZkClientTypeSet) {
-        zkClientType = ZkBaseDataAccessor.ZkClientType.SHARED;
+      // If ZkClientType is not set and realmMode is single-realm, default to SHARED
+      if (!isZkClientTypeSet && realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM) {
+        zkClientType = ZkClientType.SHARED;
       }
 
       if (realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM && !isZkAddressSet) {
-        throw new HelixException(
-            "RealmMode cannot be single-realm without a valid ZkAddress set!");
+        throw new HelixException("RealmMode cannot be single-realm without a valid ZkAddress set!");
       }
 
       if (realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM && isZkAddressSet) {
-        throw new HelixException(
-            "ZkAddress cannot be set on multi-realm mode!");
+        throw new HelixException("ZkAddress cannot be set on multi-realm mode!");
       }
 
       if (realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM
           && zkClientType == ZkClientType.FEDERATED) {
-        throw new HelixException(
-            "FederatedZkClient cannot be set on single-realm mode!");
+        throw new HelixException("FederatedZkClient cannot be set on single-realm mode!");
       }
 
       if (realmMode == null) {
@@ -1412,7 +1428,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
 
       // Resolve RealmAwareZkClientConfig
       if (realmAwareZkClientConfig == null) {
-        realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig();
+        realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig()
+            .setZkSerializer(new ZNRecordSerializer());
       }
 
       // Resolve RealmAwareZkConnectionConfig