You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/04/08 22:54:08 UTC
[helix] 38/50: Make ZkBaseDataAccessor realm-aware (#855)
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch zooscalability_merge
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 8aebcf42975d8fecbe8698ab5db23851a4b9147e
Author: Huizhi Lu <ih...@gmail.com>
AuthorDate: Wed Mar 11 23:38:52 2020 -0700
Make ZkBaseDataAccessor realm-aware (#855)
This commit makes ZkBaseDataAccessor realm-aware by building according realm-aware ZkClients in the constructor. A Builder is provided to set realm-aware client config and connection config.
---
.../helix/manager/zk/ZkBaseDataAccessor.java | 261 ++++++++++++++++++---
1 file changed, 233 insertions(+), 28 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index f287c22..1d60c7b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -19,6 +19,7 @@ package org.apache.helix.manager.zk;
* under the License.
*/
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -26,16 +27,20 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixException;
+import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.store.zk.ZNode;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
@@ -60,21 +65,23 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
// created on SHARED mode.
// TODO: move this to RealmAwareZkClient
public enum ZkClientType {
- /*
+ /**
* When ZkBaseDataAccessor is created with the DEDICATED type, it supports ephemeral node
* creation, callback functionality, and session management. But note that this is more
* resource-heavy since it creates a dedicated ZK connection so should be used sparingly only
* when the aforementioned features are needed.
*/
DEDICATED,
- /*
+
+ /**
* When ZkBaseDataAccessor is created with the SHARED type, it only supports CRUD
* functionalities. This will be the default mode of creation.
*/
SHARED,
- /*
+
+ /**
* Uses FederatedZkClient (applicable on multi-realm mode only) that queries Metadata Store
- * Directory Service for routing data
+ * Directory Service for routing data.
*/
FEDERATED
}
@@ -116,6 +123,13 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
// ZkClient
private final boolean _usesExternalZkClient;
+ /**
+ * @deprecated it is recommended to use the builder constructor {@link Builder}
+ * instead to avoid having to manually create and maintain a RealmAwareZkClient
+ * outside of ZkBaseDataAccessor.
+ *
+ * @param zkClient A created RealmAwareZkClient
+ */
@Deprecated
public ZkBaseDataAccessor(RealmAwareZkClient zkClient) {
if (zkClient == null) {
@@ -125,13 +139,63 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
_usesExternalZkClient = true;
}
+ private ZkBaseDataAccessor(Builder builder) {
+ switch (builder.realmMode) {
+ case MULTI_REALM:
+ try {
+ if (builder.zkClientType == ZkClientType.DEDICATED) {
+ // Use a realm-aware dedicated zk client
+ _zkClient = DedicatedZkClientFactory.getInstance()
+ .buildZkClient(builder.realmAwareZkConnectionConfig,
+ builder.realmAwareZkClientConfig);
+ } else if (builder.zkClientType == ZkClientType.SHARED) {
+ // Use a realm-aware shared zk client
+ _zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(builder.realmAwareZkConnectionConfig,
+ builder.realmAwareZkClientConfig);
+ } else {
+ _zkClient = new FederatedZkClient(builder.realmAwareZkConnectionConfig,
+ builder.realmAwareZkClientConfig);
+ }
+ } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+ throw new HelixException("Not able to connect on multi-realm mode.", e);
+ }
+ break;
+
+ case SINGLE_REALM:
+ // Create a HelixZkClient: Use a SharedZkClient because ZkBaseDataAccessor does not need to
+ // do ephemeral operations.
+ if (builder.zkClientType == ZkClientType.DEDICATED) {
+ // If DEDICATED, then we use a dedicated HelixZkClient because we must support ephemeral
+ // operations
+ _zkClient = DedicatedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(builder.zkAddress),
+ builder.realmAwareZkClientConfig.createHelixZkClientConfig());
+ } else {
+ _zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(builder.zkAddress),
+ builder.realmAwareZkClientConfig.createHelixZkClientConfig());
+ _zkClient
+ .waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+ break;
+ default:
+ throw new HelixException("Invalid RealmMode given: " + builder.realmMode);
+ }
+
+ _usesExternalZkClient = false;
+ }
+
/**
* The ZkBaseDataAccessor with custom serializer support of ZkSerializer type.
* Note: This constructor will use a shared ZkConnection.
* Do NOT use this for ephemeral node creation/callbacks/session management.
* Do use this for simple CRUD operations to ZooKeeper.
* @param zkAddress The zookeeper address
+ *
+ * @deprecated it is recommended to use the builder constructor {@link Builder}
*/
+ @Deprecated
public ZkBaseDataAccessor(String zkAddress, ZkSerializer zkSerializer) {
this(zkAddress, zkSerializer, ZkClientType.SHARED);
}
@@ -142,7 +206,10 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
* Do NOT use this for ephemeral node creation/callbacks/session management.
* Do use this for simple CRUD operations to ZooKeeper.
* @param zkAddress The zookeeper address
+ *
+ * @deprecated it is recommended to use the builder constructor {@link Builder}
*/
+ @Deprecated
public ZkBaseDataAccessor(String zkAddress, PathBasedZkSerializer pathBasedZkSerializer) {
this(zkAddress, pathBasedZkSerializer, ZkClientType.SHARED);
}
@@ -153,7 +220,10 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
* Does NOT support ephemeral node creation, callbacks, or session management.
* Uses {@link ZNRecordSerializer} serializer
* @param zkAddress The zookeeper address
+ *
+ * @deprecated it is recommended to use the builder constructor {@link Builder}
*/
+ @Deprecated
public ZkBaseDataAccessor(String zkAddress) {
this(zkAddress, new ZNRecordSerializer());
}
@@ -166,7 +236,10 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
* CRUD operations to ZooKeeper.
* @param zkAddress
* @param zkClientType
+ *
+ * @deprecated it is recommended to use the builder constructor {@link Builder}
*/
+ @Deprecated
public ZkBaseDataAccessor(String zkAddress, ZkClientType zkClientType) {
this(zkAddress, new ZNRecordSerializer(), zkClientType);
}
@@ -179,21 +252,16 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
* CRUD operations to ZooKeeper.
* @param zkAddress
* @param zkSerializer
+ *
+ * @deprecated it is recommended to use the builder constructor {@link Builder}
*/
+ @Deprecated
public ZkBaseDataAccessor(String zkAddress, ZkSerializer zkSerializer,
ZkClientType zkClientType) {
- switch (zkClientType) {
- case DEDICATED:
- _zkClient = DedicatedZkClientFactory.getInstance().buildZkClient(
- new HelixZkClient.ZkConnectionConfig(zkAddress),
- new HelixZkClient.ZkClientConfig().setZkSerializer(zkSerializer));
- break;
- case SHARED:
- default:
- _zkClient = SharedZkClientFactory.getInstance().buildZkClient(
- new HelixZkClient.ZkConnectionConfig(zkAddress),
- new HelixZkClient.ZkClientConfig().setZkSerializer(zkSerializer));
- }
+ RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
+ new RealmAwareZkClient.RealmAwareZkClientConfig().setZkSerializer(zkSerializer);
+
+ _zkClient = buildRealmAwareZkClient(clientConfig, zkAddress, zkClientType);
_usesExternalZkClient = false;
}
@@ -206,21 +274,16 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
* @param zkAddress
* @param pathBasedZkSerializer
* @param zkClientType
+ *
+ * @deprecated it is recommended to use the builder constructor {@link Builder}
*/
+ @Deprecated
public ZkBaseDataAccessor(String zkAddress, PathBasedZkSerializer pathBasedZkSerializer,
ZkClientType zkClientType) {
- switch (zkClientType) {
- case DEDICATED:
- _zkClient = DedicatedZkClientFactory.getInstance().buildZkClient(
- new HelixZkClient.ZkConnectionConfig(zkAddress),
- new HelixZkClient.ZkClientConfig().setZkSerializer(pathBasedZkSerializer));
- break;
- case SHARED:
- default:
- _zkClient = SharedZkClientFactory.getInstance().buildZkClient(
- new HelixZkClient.ZkConnectionConfig(zkAddress),
- new HelixZkClient.ZkClientConfig().setZkSerializer(pathBasedZkSerializer));
- }
+ RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
+ new RealmAwareZkClient.RealmAwareZkClientConfig().setZkSerializer(pathBasedZkSerializer);
+
+ _zkClient = buildRealmAwareZkClient(clientConfig, zkAddress, zkClientType);
_usesExternalZkClient = false;
}
@@ -1256,4 +1319,146 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
_zkClient.close();
}
}
+
+ // TODO: refactor Builder class to remove duplicate code with other Helix Java APIs
+ public static class Builder {
+ private String zkAddress;
+ private RealmAwareZkClient.RealmMode realmMode;
+ private ZkBaseDataAccessor.ZkClientType zkClientType;
+ private RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig;
+ private RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig;
+
+ public Builder() {
+ }
+
+ public ZkBaseDataAccessor.Builder setZkAddress(String zkAddress) {
+ this.zkAddress = zkAddress;
+ return this;
+ }
+
+ public ZkBaseDataAccessor.Builder setRealmMode(RealmAwareZkClient.RealmMode realmMode) {
+ this.realmMode = realmMode;
+ return this;
+ }
+
+ public ZkBaseDataAccessor.Builder setZkClientType(
+ ZkBaseDataAccessor.ZkClientType zkClientType) {
+ this.zkClientType = zkClientType;
+ return this;
+ }
+
+ public ZkBaseDataAccessor.Builder setRealmAwareZkConnectionConfig(
+ RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig) {
+ this.realmAwareZkConnectionConfig = realmAwareZkConnectionConfig;
+ return this;
+ }
+
+ public ZkBaseDataAccessor.Builder setRealmAwareZkClientConfig(
+ RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig) {
+ this.realmAwareZkClientConfig = realmAwareZkClientConfig;
+ return this;
+ }
+
+ /**
+ * Returns a <code>ZkBaseDataAccessor</code> instance.
+ * <p>
+ * Note: in multi-realm mode, if and only if ZK client type is set to <code>FEDERATED</code>,
+ * <code>ZkBaseDataAccessor</code> can access to multi-realm. Otherwise, it can only access to
+ * single-ream.
+ */
+ public ZkBaseDataAccessor<?> build() {
+ validate();
+ return new ZkBaseDataAccessor<>(this);
+ }
+
+ /*
+ * Validates the given parameters before building an instance of ZkBaseDataAccessor.
+ */
+ private void validate() {
+ // Resolve RealmMode based on other parameters
+ boolean isZkAddressSet = zkAddress != null && !zkAddress.isEmpty();
+ boolean isZkClientTypeSet = zkClientType != null;
+
+ // If ZkClientType is set, RealmMode must either be single-realm or not set.
+ if (isZkClientTypeSet && realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM) {
+ throw new HelixException(
+ "ZkClientType cannot be set on multi-realm mode!");
+ }
+ // If ZkClientType is not set, default to SHARED
+ if (!isZkClientTypeSet) {
+ zkClientType = ZkBaseDataAccessor.ZkClientType.SHARED;
+ }
+
+ if (realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM && !isZkAddressSet) {
+ throw new HelixException(
+ "RealmMode cannot be single-realm without a valid ZkAddress set!");
+ }
+
+ if (realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM && isZkAddressSet) {
+ throw new HelixException(
+ "ZkAddress cannot be set on multi-realm mode!");
+ }
+
+ if (realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM
+ && zkClientType == ZkClientType.FEDERATED) {
+ throw new HelixException(
+ "FederatedZkClient cannot be set on single-realm mode!");
+ }
+
+ if (realmMode == null) {
+ realmMode = isZkAddressSet ? RealmAwareZkClient.RealmMode.SINGLE_REALM
+ : RealmAwareZkClient.RealmMode.MULTI_REALM;
+ }
+
+ // Resolve RealmAwareZkClientConfig
+ if (realmAwareZkClientConfig == null) {
+ realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig();
+ }
+
+ // Resolve RealmAwareZkConnectionConfig
+ if (realmAwareZkConnectionConfig == null) {
+ // If not set, create a default one
+ realmAwareZkConnectionConfig =
+ new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build();
+ }
+ }
+ }
+
+ /*
+ * This is used for constructors that do not take a Builder in as a parameter because of
+ * keeping backward-compatibility.
+ */
+ private RealmAwareZkClient buildRealmAwareZkClient(
+ RealmAwareZkClient.RealmAwareZkClientConfig clientConfig, String zkAddress,
+ ZkClientType zkClientType) {
+ if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+ try {
+ return new FederatedZkClient(
+ new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(), clientConfig);
+ } catch (IllegalStateException | IOException | InvalidRoutingDataException e) {
+ throw new HelixException("Not able to connect on multi-realm mode.", e);
+ }
+ }
+
+ RealmAwareZkClient zkClient;
+
+ switch (zkClientType) {
+ case DEDICATED:
+ zkClient = DedicatedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
+ clientConfig.createHelixZkClientConfig());
+ break;
+ case SHARED:
+ default:
+ zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
+ clientConfig.createHelixZkClientConfig());
+
+ zkClient
+ .waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+ break;
+ }
+
+ return zkClient;
+ }
}