You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/04/01 22:47:31 UTC
[helix] 36/49: Make ZkCacheBaseDataAccessor and
ZkHelixPropertyStore realm-aware (#863)
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch zooscalability
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 8d472699042fb88ae2aa95ce712ed4cdab10f627
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Wed Mar 11 19:44:26 2020 -0700
Make ZkCacheBaseDataAccessor and ZkHelixPropertyStore realm-aware (#863)
This commit makes both ZkCacheBaseDataAccessor and ZkHelixPropertyStore realm-aware by choosing the appropriate realm-aware ZkClients in the constructor. Also, we add a Builder here to give users options to set Connection config and Client config.
Note that ZkHelixPropertyStore extends CacheBaseDataAccessor so there is no change needed.
---
.../helix/manager/zk/ZkBaseDataAccessor.java | 11 +-
.../helix/manager/zk/ZkCacheBaseDataAccessor.java | 274 ++++++++++++++++++---
2 files changed, 243 insertions(+), 42 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index f08ba55..f287c22 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -58,6 +58,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
// Designates which mode ZkBaseDataAccessor should be created in. If not specified, it will be
// created on SHARED mode.
+ // TODO: move this to RealmAwareZkClient
public enum ZkClientType {
/*
* When ZkBaseDataAccessor is created with the DEDICATED type, it supports ephemeral node
@@ -70,7 +71,12 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
* When ZkBaseDataAccessor is created with the SHARED type, it only supports CRUD
* functionalities. This will be the default mode of creation.
*/
- SHARED
+ SHARED,
+ /*
+ * Uses FederatedZkClient (applicable on multi-realm mode only) that queries Metadata Store
+ * Directory Service for routing data
+ */
+ FEDERATED
}
enum RetCode {
@@ -104,7 +110,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
private static Logger LOG = LoggerFactory.getLogger(ZkBaseDataAccessor.class);
private final RealmAwareZkClient _zkClient;
- // true if ZkBaseDataAccessor was instantiated with a HelixZkClient, false otherwise
+
+ // true if ZkBaseDataAccessor was instantiated with a RealmAwareZkClient, false otherwise
// This is used for close() to determine how ZkBaseDataAccessor should close the underlying
// ZkClient
private final boolean _usesExternalZkClient;
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
index 6d2c5cf..bd05ea7 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
@@ -19,10 +19,10 @@ package org.apache.helix.manager.zk;
* under the License.
*/
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -31,12 +31,16 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixException;
+import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.manager.zk.ZkBaseDataAccessor.RetCode;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.store.HelixPropertyListener;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZNode;
import org.apache.helix.util.PathUtils;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
@@ -58,7 +62,15 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
protected ZkCallbackCache<T> _zkCache;
final ZkBaseDataAccessor<T> _baseAccessor;
- final Map<String, Cache<T>> _cacheMap;
+
+ // TODO: need to make sure no overlap between wtCachePaths and zkCachePaths
+ // TreeMap key is ordered by key string length, so more general (i.e. short) prefix
+ // comes first
+ final Map<String, Cache<T>> _cacheMap = new TreeMap<>((o1, o2) -> {
+ int len1 = o1.split("/").length;
+ int len2 = o2.split("/").length;
+ return len1 - len2;
+ });
final String _chrootPath;
final List<String> _wtCachePaths;
@@ -70,12 +82,14 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
private final ReentrantLock _eventLock = new ReentrantLock();
private ZkCacheEventThread _eventThread;
- private HelixZkClient _zkClient = null;
+ private RealmAwareZkClient _zkClient;
+ @Deprecated
public ZkCacheBaseDataAccessor(ZkBaseDataAccessor<T> baseAccessor, List<String> wtCachePaths) {
this(baseAccessor, null, wtCachePaths, null);
}
+ @Deprecated
public ZkCacheBaseDataAccessor(ZkBaseDataAccessor<T> baseAccessor, String chrootPath,
List<String> wtCachePaths, List<String> zkCachePaths) {
_baseAccessor = baseAccessor;
@@ -90,50 +104,62 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
_wtCachePaths = wtCachePaths;
_zkCachePaths = zkCachePaths;
- // TODO: need to make sure no overlap between wtCachePaths and zkCachePaths
- // TreeMap key is ordered by key string length, so more general (i.e. short) prefix
- // comes first
- _cacheMap = new TreeMap<>(new Comparator<String>() {
- @Override
- public int compare(String o1, String o2) {
- int len1 = o1.split("/").length;
- int len2 = o2.split("/").length;
- return len1 - len2;
- }
- });
-
start();
}
+ @Deprecated
public ZkCacheBaseDataAccessor(String zkAddress, ZkSerializer serializer, String chrootPath,
List<String> wtCachePaths, List<String> zkCachePaths) {
this(zkAddress, serializer, chrootPath, wtCachePaths, zkCachePaths, null, null,
ZkBaseDataAccessor.ZkClientType.SHARED);
}
+ @Deprecated
public ZkCacheBaseDataAccessor(String zkAddress, ZkSerializer serializer, String chrootPath,
List<String> wtCachePaths, List<String> zkCachePaths, String monitorType, String monitorkey) {
this(zkAddress, serializer, chrootPath, wtCachePaths, zkCachePaths, monitorType, monitorkey,
ZkBaseDataAccessor.ZkClientType.SHARED);
}
+ @Deprecated
public ZkCacheBaseDataAccessor(String zkAddress, ZkSerializer serializer, String chrootPath,
List<String> wtCachePaths, List<String> zkCachePaths, String monitorType, String monitorkey,
ZkBaseDataAccessor.ZkClientType zkClientType) {
- HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
- clientConfig.setZkSerializer(serializer).setMonitorType(monitorType).setMonitorKey(monitorkey);
- switch (zkClientType) {
- case DEDICATED:
- _zkClient = DedicatedZkClientFactory.getInstance().buildZkClient(
- new HelixZkClient.ZkConnectionConfig(zkAddress),
- new HelixZkClient.ZkClientConfig().setZkSerializer(serializer));
- break;
- case SHARED:
- default:
- _zkClient = SharedZkClientFactory.getInstance()
- .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), clientConfig);
- }
- _zkClient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+
+ // If the multi ZK config is enabled, use multi-realm mode with FederatedZkClient
+ if (Boolean.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
+ try {
+ RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
+ new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
+ RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
+ new RealmAwareZkClient.RealmAwareZkClientConfig();
+ clientConfig.setZkSerializer(serializer).setMonitorType(monitorType)
+ .setMonitorKey(monitorkey);
+ // Use a federated zk client
+ _zkClient = new FederatedZkClient(connectionConfigBuilder.build(), clientConfig);
+ } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+ // Note: IllegalStateException is for HttpRoutingDataReader if MSDS endpoint cannot be
+ // found
+ throw new HelixException("Failed to create ZkCacheBaseDataAccessor!", e);
+ }
+ } else {
+ HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+ clientConfig.setZkSerializer(serializer).setMonitorType(monitorType)
+ .setMonitorKey(monitorkey);
+ switch (zkClientType) {
+ case DEDICATED:
+ _zkClient = DedicatedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
+ new HelixZkClient.ZkClientConfig().setZkSerializer(serializer));
+ break;
+ case SHARED:
+ default:
+ _zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), clientConfig);
+ }
+ _zkClient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+
_baseAccessor = new ZkBaseDataAccessor<>(_zkClient);
if (chrootPath == null || chrootPath.equals("/")) {
@@ -146,17 +172,67 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
_wtCachePaths = wtCachePaths;
_zkCachePaths = zkCachePaths;
- // TODO: need to make sure no overlap between wtCachePaths and zkCachePaths
- // TreeMap key is ordered by key string length, so more general (i.e. short) prefix
- // comes first
- _cacheMap = new TreeMap<>(new Comparator<String>() {
- @Override
- public int compare(String o1, String o2) {
- int len1 = o1.split("/").length;
- int len2 = o2.split("/").length;
- return len1 - len2;
- }
- });
+ start();
+ }
+
+ /**
+ * Constructor using a Builder that allows users to set connection and client configs.
+ * @param builder
+ */
+ private ZkCacheBaseDataAccessor(Builder builder) {
+ _chrootPath = builder._chrootPath;
+ _wtCachePaths = builder._wtCachePaths;
+ _zkCachePaths = builder._zkCachePaths;
+
+ RealmAwareZkClient zkClient;
+ switch (builder._realmMode) {
+ case MULTI_REALM:
+ try {
+ if (builder._zkClientType == ZkBaseDataAccessor.ZkClientType.DEDICATED) {
+ // Use a realm-aware dedicated zk client
+ zkClient = DedicatedZkClientFactory.getInstance()
+ .buildZkClient(builder._realmAwareZkConnectionConfig,
+ builder._realmAwareZkClientConfig);
+ } else if (builder._zkClientType == ZkBaseDataAccessor.ZkClientType.SHARED) {
+ // Use a realm-aware shared zk client
+ zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(builder._realmAwareZkConnectionConfig,
+ builder._realmAwareZkClientConfig);
+ } else {
+ // Use a federated zk client
+ zkClient = new FederatedZkClient(builder._realmAwareZkConnectionConfig,
+ builder._realmAwareZkClientConfig);
+ }
+ break; // Must break out of the switch statement here since zkClient has been created
+ } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+ // Note: IllegalStateException is for HttpRoutingDataReader if MSDS endpoint cannot be
+ // found
+ throw new HelixException("Failed to create ZkCacheBaseDataAccessor!", e);
+ }
+ case SINGLE_REALM:
+ switch (builder._zkClientType) {
+ case DEDICATED:
+ // If DEDICATED, then we use a dedicated HelixZkClient because we must support ephemeral
+ // operations
+ zkClient = DedicatedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(builder._zkAddress),
+ builder._realmAwareZkClientConfig.createHelixZkClientConfig());
+ break;
+ case SHARED:
+ default:
+ zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(builder._zkAddress),
+ builder._realmAwareZkClientConfig.createHelixZkClientConfig());
+ zkClient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT,
+ TimeUnit.MILLISECONDS);
+ break;
+ }
+ default:
+ throw new HelixException("Invalid RealmMode given: " + builder._realmMode);
+ }
+
+ _zkClient = zkClient;
+ _baseAccessor = new ZkBaseDataAccessor<>(_zkClient);
start();
}
@@ -842,4 +918,122 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
_zkClient.close();
}
}
+
+ public static class Builder {
+ private String _zkAddress;
+ private RealmAwareZkClient.RealmMode _realmMode;
+ private RealmAwareZkClient.RealmAwareZkConnectionConfig _realmAwareZkConnectionConfig;
+ private RealmAwareZkClient.RealmAwareZkClientConfig _realmAwareZkClientConfig;
+
+ /** ZkCacheBaseDataAccessor-specific parameters */
+ private String _chrootPath;
+ private List<String> _wtCachePaths;
+ private List<String> _zkCachePaths;
+ private ZkBaseDataAccessor.ZkClientType _zkClientType;
+
+ public Builder() {
+ }
+
+ public Builder setZkAddress(String zkAddress) {
+ _zkAddress = zkAddress;
+ return this;
+ }
+
+ public Builder setRealmMode(RealmAwareZkClient.RealmMode realmMode) {
+ _realmMode = realmMode;
+ return this;
+ }
+
+ public Builder setRealmAwareZkConnectionConfig(
+ RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig) {
+ _realmAwareZkConnectionConfig = realmAwareZkConnectionConfig;
+ return this;
+ }
+
+ public Builder setRealmAwareZkClientConfig(
+ RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig) {
+ _realmAwareZkClientConfig = realmAwareZkClientConfig;
+ return this;
+ }
+
+ public Builder setChrootPath(String chrootPath) {
+ _chrootPath = chrootPath;
+ return this;
+ }
+
+ public Builder setWtCachePaths(List<String> wtCachePaths) {
+ _wtCachePaths = wtCachePaths;
+ return this;
+ }
+
+ public Builder setZkCachePaths(List<String> zkCachePaths) {
+ _zkCachePaths = zkCachePaths;
+ return this;
+ }
+
+ /**
+ * Sets the ZkClientType. If this is set, ZkCacheBaseDataAccessor will be created on
+ * single-realm mode.
+ * @param zkClientType
+ * @return
+ */
+ public Builder setZkClientType(ZkBaseDataAccessor.ZkClientType zkClientType) {
+ _zkClientType = zkClientType;
+ return this;
+ }
+
+ public ZkCacheBaseDataAccessor build() {
+ validate();
+ return new ZkCacheBaseDataAccessor(this);
+ }
+
+ private void validate() {
+ // Resolve RealmMode based on other parameters
+ boolean isZkAddressSet = _zkAddress != null && !_zkAddress.isEmpty();
+ boolean isZkClientTypeSet = _zkClientType != null;
+
+ // If ZkClientType is set, RealmMode must either be single-realm or not set.
+ if (isZkClientTypeSet && _realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM) {
+ throw new HelixException(
+ "ZkCacheBaseDataAccessor: you cannot set ZkClientType on multi-realm mode!");
+ }
+ // If ZkClientType is not set, default to SHARED
+ if (!isZkClientTypeSet) {
+ _zkClientType = ZkBaseDataAccessor.ZkClientType.SHARED;
+ }
+
+ if (_realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM && !isZkAddressSet) {
+ throw new HelixException(
+ "ZkCacheBaseDataAccessor: RealmMode cannot be single-realm without a valid ZkAddress set!");
+ }
+
+ if (_realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM && isZkAddressSet) {
+ throw new HelixException(
+ "ZkCacheBaseDataAccessor: You cannot set the ZkAddress on multi-realm mode!");
+ }
+
+ if (_realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM
+ && _zkClientType == ZkBaseDataAccessor.ZkClientType.FEDERATED) {
+ throw new HelixException(
+ "ZkCacheBaseDataAccessor: You cannot use FederatedZkClient on single-realm mode!");
+ }
+
+ if (_realmMode == null) {
+ _realmMode = isZkAddressSet ? RealmAwareZkClient.RealmMode.SINGLE_REALM
+ : RealmAwareZkClient.RealmMode.MULTI_REALM;
+ }
+
+ // Resolve RealmAwareZkClientConfig
+ if (_realmAwareZkClientConfig == null) {
+ _realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig();
+ }
+
+ // Resolve RealmAwareZkConnectionConfig
+ if (_realmAwareZkConnectionConfig == null) {
+ // If not set, create a default one
+ _realmAwareZkConnectionConfig =
+ new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build();
+ }
+ }
+ }
}