You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/04/01 22:47:35 UTC
[helix] 40/49: Reformat ZkBaseDataAccessor (#893)
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch zooscalability
in repository https://gitbox.apache.org/repos/asf/helix.git
commit c3bdbae9930e9f4086408243ddd5835303d29aab
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Fri Mar 13 17:19:34 2020 -0700
Reformat ZkBaseDataAccessor (#893)
Changelist:
1. Add generic type markers to Builder (<T>)
2. Fix a bug in validate function
3. Default to ZNRecordSerializer to preserve existing behavior
---
.../helix/manager/zk/ZkBaseDataAccessor.java | 301 +++++++++++----------
1 file changed, 159 insertions(+), 142 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index 1d60c7b..32f33f8 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -52,6 +52,7 @@ import org.apache.helix.zookeeper.zkclient.exception.ZkException;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.Stat;
@@ -59,6 +60,7 @@ import org.apache.zookeeper.server.DataTree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
// Designates which mode ZkBaseDataAccessor should be created in. If not specified, it will be
@@ -139,7 +141,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
_usesExternalZkClient = true;
}
- private ZkBaseDataAccessor(Builder builder) {
+ private ZkBaseDataAccessor(Builder<T> builder) {
switch (builder.realmMode) {
case MULTI_REALM:
try {
@@ -397,16 +399,16 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
result._pathCreated.addAll(res._pathCreated);
RetCode rc = res._retCode;
switch (rc) {
- case OK:
- // not set stat if node is created (instead of set)
- break;
- case NODE_EXISTS:
- retry = true;
- break;
- default:
- LOG.error("Fail to set path by creating: " + path);
- result._retCode = RetCode.ERROR;
- return result;
+ case OK:
+ // not set stat if node is created (instead of set)
+ break;
+ case NODE_EXISTS:
+ retry = true;
+ break;
+ default:
+ LOG.error("Fail to set path by creating: " + path);
+ result._retCode = RetCode.ERROR;
+ return result;
}
} catch (Exception e1) {
LOG.error("Exception while setting path by creating: " + path, e);
@@ -477,16 +479,16 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
rc = RetCode.OK;
}
switch (rc) {
- case OK:
- updatedData = newData;
- break;
- case NODE_EXISTS:
- retry = true;
- break;
- default:
- LOG.error("Fail to update path by creating: " + path);
- result._retCode = RetCode.ERROR;
- return result;
+ case OK:
+ updatedData = newData;
+ break;
+ case NODE_EXISTS:
+ retry = true;
+ break;
+ default:
+ LOG.error("Fail to update path by creating: " + path);
+ result._retCode = RetCode.ERROR;
+ return result;
}
} catch (Exception e1) {
LOG.error("Exception while updating path by creating: " + path, e1);
@@ -553,17 +555,19 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
// init stats
if (stats != null) {
stats.clear();
- stats.addAll(Collections.<Stat> nCopies(paths.size(), null));
+ stats.addAll(Collections.<Stat>nCopies(paths.size(), null));
}
long startT = System.nanoTime();
try {
// issue asyn get requests
- ZkAsyncCallbacks.GetDataCallbackHandler[] cbList = new ZkAsyncCallbacks.GetDataCallbackHandler[paths.size()];
+ ZkAsyncCallbacks.GetDataCallbackHandler[] cbList =
+ new ZkAsyncCallbacks.GetDataCallbackHandler[paths.size()];
for (int i = 0; i < paths.size(); i++) {
- if (!needRead[i])
+ if (!needRead[i]) {
continue;
+ }
String path = paths.get(i);
cbList[i] = new ZkAsyncCallbacks.GetDataCallbackHandler();
@@ -572,19 +576,21 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
// wait for completion
for (int i = 0; i < cbList.length; i++) {
- if (!needRead[i])
+ if (!needRead[i]) {
continue;
+ }
ZkAsyncCallbacks.GetDataCallbackHandler cb = cbList[i];
cb.waitForSuccess();
}
// construct return results
- List<T> records = new ArrayList<T>(Collections.<T> nCopies(paths.size(), null));
+ List<T> records = new ArrayList<T>(Collections.<T>nCopies(paths.size(), null));
Map<String, Integer> pathFailToRead = new HashMap<>();
for (int i = 0; i < paths.size(); i++) {
- if (!needRead[i])
+ if (!needRead[i]) {
continue;
+ }
ZkAsyncCallbacks.GetDataCallbackHandler cb = cbList[i];
if (Code.get(cb.getRc()) == Code.OK) {
@@ -610,8 +616,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
} finally {
long endT = System.nanoTime();
if (LOG.isTraceEnabled()) {
- LOG.trace("getData_async, size: " + paths.size() + ", paths: " + paths.get(0)
- + ",... time: " + (endT - startT) + " ns");
+ LOG.trace(
+ "getData_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: " + (
+ endT - startT) + " ns");
}
}
}
@@ -752,15 +759,16 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
/**
* async create. give up on error other than NONODE
*/
- ZkAsyncCallbacks.CreateCallbackHandler[] create(List<String> paths, List<T> records, boolean[] needCreate,
- List<List<String>> pathsCreated, int options) {
+ ZkAsyncCallbacks.CreateCallbackHandler[] create(List<String> paths, List<T> records,
+ boolean[] needCreate, List<List<String>> pathsCreated, int options) {
if ((records != null && records.size() != paths.size()) || needCreate.length != paths.size()
|| (pathsCreated != null && pathsCreated.size() != paths.size())) {
throw new IllegalArgumentException(
"paths, records, needCreate, and pathsCreated should be of same size");
}
- ZkAsyncCallbacks.CreateCallbackHandler[] cbList = new ZkAsyncCallbacks.CreateCallbackHandler[paths.size()];
+ ZkAsyncCallbacks.CreateCallbackHandler[] cbList =
+ new ZkAsyncCallbacks.CreateCallbackHandler[paths.size()];
CreateMode mode = AccessOption.getMode(options);
if (mode == null) {
@@ -773,8 +781,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
retry = false;
for (int i = 0; i < paths.size(); i++) {
- if (!needCreate[i])
+ if (!needCreate[i]) {
continue;
+ }
String path = paths.get(i);
T record = records == null ? null : records.get(i);
@@ -782,12 +791,13 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
_zkClient.asyncCreate(path, record, mode, cbList[i]);
}
- List<String> parentPaths = new ArrayList<>(Collections.<String> nCopies(paths.size(), null));
+ List<String> parentPaths = new ArrayList<>(Collections.<String>nCopies(paths.size(), null));
boolean failOnNoNode = false;
for (int i = 0; i < paths.size(); i++) {
- if (!needCreate[i])
+ if (!needCreate[i]) {
continue;
+ }
ZkAsyncCallbacks.CreateCallbackHandler cb = cbList[i];
cb.waitForSuccess();
@@ -819,8 +829,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.PERSISTENT);
for (int i = 0; i < parentCbList.length; i++) {
ZkAsyncCallbacks.CreateCallbackHandler parentCb = parentCbList[i];
- if (parentCb == null)
+ if (parentCb == null) {
continue;
+ }
Code rc = Code.get(parentCb.getRc());
@@ -853,12 +864,13 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
boolean[] needCreate = new boolean[paths.size()];
Arrays.fill(needCreate, true);
List<List<String>> pathsCreated =
- new ArrayList<>(Collections.<List<String>> nCopies(paths.size(), null));
+ new ArrayList<>(Collections.<List<String>>nCopies(paths.size(), null));
long startT = System.nanoTime();
try {
- ZkAsyncCallbacks.CreateCallbackHandler[] cbList = create(paths, records, needCreate, pathsCreated, options);
+ ZkAsyncCallbacks.CreateCallbackHandler[] cbList =
+ create(paths, records, needCreate, pathsCreated, options);
for (int i = 0; i < cbList.length; i++) {
ZkAsyncCallbacks.CreateCallbackHandler cb = cbList[i];
@@ -866,12 +878,12 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
}
return success;
-
} finally {
long endT = System.nanoTime();
if (LOG.isTraceEnabled()) {
- LOG.trace("create_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: "
- + (endT - startT) + " ns");
+ LOG.trace(
+ "create_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: " + (
+ endT - startT) + " ns");
}
}
}
@@ -894,8 +906,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
return new boolean[0];
}
- if ((records != null && records.size() != paths.size())
- || (pathsCreated != null && pathsCreated.size() != paths.size())) {
+ if ((records != null && records.size() != paths.size()) || (pathsCreated != null
+ && pathsCreated.size() != paths.size())) {
throw new IllegalArgumentException("paths, records, and pathsCreated should be of same size");
}
@@ -907,8 +919,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
return success;
}
- List<Stat> setStats = new ArrayList<>(Collections.<Stat> nCopies(paths.size(), null));
- ZkAsyncCallbacks.SetDataCallbackHandler[] cbList = new ZkAsyncCallbacks.SetDataCallbackHandler[paths.size()];
+ List<Stat> setStats = new ArrayList<>(Collections.<Stat>nCopies(paths.size(), null));
+ ZkAsyncCallbacks.SetDataCallbackHandler[] cbList =
+ new ZkAsyncCallbacks.SetDataCallbackHandler[paths.size()];
ZkAsyncCallbacks.CreateCallbackHandler[] createCbList = null;
boolean[] needSet = new boolean[paths.size()];
Arrays.fill(needSet, true);
@@ -921,14 +934,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
retry = false;
for (int i = 0; i < paths.size(); i++) {
- if (!needSet[i])
+ if (!needSet[i]) {
continue;
+ }
String path = paths.get(i);
T record = records.get(i);
cbList[i] = new ZkAsyncCallbacks.SetDataCallbackHandler();
_zkClient.asyncSetData(path, record, -1, cbList[i]);
-
}
boolean failOnNoNode = false;
@@ -938,18 +951,18 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
cb.waitForSuccess();
Code rc = Code.get(cb.getRc());
switch (rc) {
- case OK:
- setStats.set(i, cb.getStat());
- needSet[i] = false;
- break;
- case NONODE:
- // if fail on NoNode, try create the node
- failOnNoNode = true;
- break;
- default:
- // if fail on error other than NoNode, give up
- needSet[i] = false;
- break;
+ case OK:
+ setStats.set(i, cb.getStat());
+ needSet[i] = false;
+ break;
+ case NONODE:
+ // if fail on NoNode, try create the node
+ failOnNoNode = true;
+ break;
+ default:
+ // if fail on error other than NoNode, give up
+ needSet[i] = false;
+ break;
}
}
@@ -965,18 +978,18 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
Code rc = Code.get(createCb.getRc());
switch (rc) {
- case OK:
- setStats.set(i, ZNode.ZERO_STAT);
- needSet[i] = false;
- break;
- case NODEEXISTS:
- retry = true;
- break;
- default:
- // if creation fails on error other than NodeExists
- // no need to retry set
- needSet[i] = false;
- break;
+ case OK:
+ setStats.set(i, ZNode.ZERO_STAT);
+ needSet[i] = false;
+ break;
+ case NODEEXISTS:
+ retry = true;
+ break;
+ default:
+ // if creation fails on error other than NodeExists
+ // no need to retry set
+ needSet[i] = false;
+ break;
}
}
}
@@ -1006,13 +1019,15 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
} finally {
long endT = System.nanoTime();
if (LOG.isTraceEnabled()) {
- LOG.trace("setData_async, size: " + paths.size() + ", paths: " + paths.get(0)
- + ",... time: " + (endT - startT) + " ns");
+ LOG.trace(
+ "setData_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: " + (
+ endT - startT) + " ns");
}
}
}
// TODO: rename to update
+
/**
* async update
*/
@@ -1039,14 +1054,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
return Collections.emptyList();
}
- if (updaters.size() != paths.size()
- || (pathsCreated != null && pathsCreated.size() != paths.size())) {
+ if (updaters.size() != paths.size() || (pathsCreated != null && pathsCreated.size() != paths
+ .size())) {
throw new IllegalArgumentException(
"paths, updaters, and pathsCreated should be of same size");
}
- List<Stat> setStats = new ArrayList<Stat>(Collections.<Stat> nCopies(paths.size(), null));
- List<T> updateData = new ArrayList<T>(Collections.<T> nCopies(paths.size(), null));
+ List<Stat> setStats = new ArrayList<Stat>(Collections.<Stat>nCopies(paths.size(), null));
+ List<T> updateData = new ArrayList<T>(Collections.<T>nCopies(paths.size(), null));
CreateMode mode = AccessOption.getMode(options);
if (mode == null) {
@@ -1054,7 +1069,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
return updateData;
}
- ZkAsyncCallbacks.SetDataCallbackHandler[] cbList = new ZkAsyncCallbacks.SetDataCallbackHandler[paths.size()];
+ ZkAsyncCallbacks.SetDataCallbackHandler[] cbList =
+ new ZkAsyncCallbacks.SetDataCallbackHandler[paths.size()];
ZkAsyncCallbacks.CreateCallbackHandler[] createCbList = null;
boolean[] needUpdate = new boolean[paths.size()];
Arrays.fill(needUpdate, true);
@@ -1104,29 +1120,30 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
for (int i = 0; i < paths.size(); i++) {
ZkAsyncCallbacks.SetDataCallbackHandler cb = cbList[i];
- if (cb == null)
+ if (cb == null) {
continue;
+ }
cb.waitForSuccess();
switch (Code.get(cb.getRc())) {
- case OK:
- updateData.set(i, newDataList.get(i));
- setStats.set(i, cb.getStat());
- needUpdate[i] = false;
- break;
- case NONODE:
- failOnNoNode = true;
- needCreate[i] = true;
- break;
- case BADVERSION:
- failOnBadVersion = true;
- break;
- default:
- // if fail on error other than NoNode or BadVersion
- // will not retry
- needUpdate[i] = false;
- break;
+ case OK:
+ updateData.set(i, newDataList.get(i));
+ setStats.set(i, cb.getStat());
+ needUpdate[i] = false;
+ break;
+ case NONODE:
+ failOnNoNode = true;
+ needCreate[i] = true;
+ break;
+ case BADVERSION:
+ failOnBadVersion = true;
+ break;
+ default:
+ // if fail on error other than NoNode or BadVersion
+ // will not retry
+ needUpdate[i] = false;
+ break;
}
}
@@ -1140,19 +1157,19 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
}
switch (Code.get(createCb.getRc())) {
- case OK:
- needUpdate[i] = false;
- updateData.set(i, newDataList.get(i));
- setStats.set(i, ZNode.ZERO_STAT);
- break;
- case NODEEXISTS:
- retry = true;
- break;
- default:
- // if fail on error other than NodeExists
- // will not retry
- needUpdate[i] = false;
- break;
+ case OK:
+ needUpdate[i] = false;
+ updateData.set(i, newDataList.get(i));
+ setStats.set(i, ZNode.ZERO_STAT);
+ break;
+ case NODEEXISTS:
+ retry = true;
+ break;
+ default:
+ // if fail on error other than NodeExists
+ // will not retry
+ needUpdate[i] = false;
+ break;
}
}
}
@@ -1172,11 +1189,11 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
} finally {
long endT = System.nanoTime();
if (LOG.isTraceEnabled()) {
- LOG.trace("setData_async, size: " + paths.size() + ", paths: " + paths.get(0)
- + ",... time: " + (endT - startT) + " ns");
+ LOG.trace(
+ "setData_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: " + (
+ endT - startT) + " ns");
}
}
-
}
/**
@@ -1209,7 +1226,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
long startT = System.nanoTime();
try {
- ZkAsyncCallbacks.ExistsCallbackHandler[] cbList = new ZkAsyncCallbacks.ExistsCallbackHandler[paths.size()];
+ ZkAsyncCallbacks.ExistsCallbackHandler[] cbList =
+ new ZkAsyncCallbacks.ExistsCallbackHandler[paths.size()];
for (int i = 0; i < paths.size(); i++) {
String path = paths.get(i);
cbList[i] = new ZkAsyncCallbacks.ExistsCallbackHandler();
@@ -1226,8 +1244,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
} finally {
long endT = System.nanoTime();
if (LOG.isTraceEnabled()) {
- LOG.trace("exists_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: "
- + (endT - startT) + " ns");
+ LOG.trace(
+ "exists_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: " + (
+ endT - startT) + " ns");
}
}
}
@@ -1243,7 +1262,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
boolean[] success = new boolean[paths.size()];
- ZkAsyncCallbacks.DeleteCallbackHandler[] cbList = new ZkAsyncCallbacks.DeleteCallbackHandler[paths.size()];
+ ZkAsyncCallbacks.DeleteCallbackHandler[] cbList =
+ new ZkAsyncCallbacks.DeleteCallbackHandler[paths.size()];
long startT = System.nanoTime();
@@ -1264,8 +1284,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
} finally {
long endT = System.nanoTime();
if (LOG.isTraceEnabled()) {
- LOG.trace("delete_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: "
- + (endT - startT) + " ns");
+ LOG.trace(
+ "delete_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: " + (
+ endT - startT) + " ns");
}
}
}
@@ -1321,39 +1342,38 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
}
// TODO: refactor Builder class to remove duplicate code with other Helix Java APIs
- public static class Builder {
+ public static class Builder<T> {
private String zkAddress;
- private RealmAwareZkClient.RealmMode realmMode;
private ZkBaseDataAccessor.ZkClientType zkClientType;
+ private RealmAwareZkClient.RealmMode realmMode;
private RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig;
private RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig;
public Builder() {
}
- public ZkBaseDataAccessor.Builder setZkAddress(String zkAddress) {
+ public Builder<T> setZkAddress(String zkAddress) {
this.zkAddress = zkAddress;
return this;
}
- public ZkBaseDataAccessor.Builder setRealmMode(RealmAwareZkClient.RealmMode realmMode) {
+ public Builder<T> setRealmMode(RealmAwareZkClient.RealmMode realmMode) {
this.realmMode = realmMode;
return this;
}
- public ZkBaseDataAccessor.Builder setZkClientType(
- ZkBaseDataAccessor.ZkClientType zkClientType) {
+ public Builder<T> setZkClientType(ZkClientType zkClientType) {
this.zkClientType = zkClientType;
return this;
}
- public ZkBaseDataAccessor.Builder setRealmAwareZkConnectionConfig(
+ public Builder<T> setRealmAwareZkConnectionConfig(
RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig) {
this.realmAwareZkConnectionConfig = realmAwareZkConnectionConfig;
return this;
}
- public ZkBaseDataAccessor.Builder setRealmAwareZkClientConfig(
+ public Builder<T> setRealmAwareZkClientConfig(
RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig) {
this.realmAwareZkClientConfig = realmAwareZkClientConfig;
return this;
@@ -1362,11 +1382,11 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
/**
* Returns a <code>ZkBaseDataAccessor</code> instance.
* <p>
- * Note: in multi-realm mode, if and only if ZK client type is set to <code>FEDERATED</code>,
- * <code>ZkBaseDataAccessor</code> can access to multi-realm. Otherwise, it can only access to
- * single-ream.
+ * Note: ZK client type must be set to <code>FEDERATED</code> in order for
+ * <code>ZkBaseDataAccessor</code> can access multiple ZKs. Otherwise, it can only access
+ * single-ZK.
*/
- public ZkBaseDataAccessor<?> build() {
+ public ZkBaseDataAccessor<T> build() {
validate();
return new ZkBaseDataAccessor<>(this);
}
@@ -1381,28 +1401,24 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
// If ZkClientType is set, RealmMode must either be single-realm or not set.
if (isZkClientTypeSet && realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM) {
- throw new HelixException(
- "ZkClientType cannot be set on multi-realm mode!");
+ throw new HelixException("ZkClientType cannot be set on multi-realm mode!");
}
- // If ZkClientType is not set, default to SHARED
- if (!isZkClientTypeSet) {
- zkClientType = ZkBaseDataAccessor.ZkClientType.SHARED;
+ // If ZkClientType is not set and realmMode is single-realm, default to SHARED
+ if (!isZkClientTypeSet && realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM) {
+ zkClientType = ZkClientType.SHARED;
}
if (realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM && !isZkAddressSet) {
- throw new HelixException(
- "RealmMode cannot be single-realm without a valid ZkAddress set!");
+ throw new HelixException("RealmMode cannot be single-realm without a valid ZkAddress set!");
}
if (realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM && isZkAddressSet) {
- throw new HelixException(
- "ZkAddress cannot be set on multi-realm mode!");
+ throw new HelixException("ZkAddress cannot be set on multi-realm mode!");
}
if (realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM
&& zkClientType == ZkClientType.FEDERATED) {
- throw new HelixException(
- "FederatedZkClient cannot be set on single-realm mode!");
+ throw new HelixException("FederatedZkClient cannot be set on single-realm mode!");
}
if (realmMode == null) {
@@ -1412,7 +1428,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
// Resolve RealmAwareZkClientConfig
if (realmAwareZkClientConfig == null) {
- realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig();
+ realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig()
+ .setZkSerializer(new ZNRecordSerializer());
}
// Resolve RealmAwareZkConnectionConfig