You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/04/08 22:54:08 UTC

[helix] 38/50: Make ZkBaseDataAccessor realm-aware (#855)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch zooscalability_merge
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 8aebcf42975d8fecbe8698ab5db23851a4b9147e
Author: Huizhi Lu <ih...@gmail.com>
AuthorDate: Wed Mar 11 23:38:52 2020 -0700

    Make ZkBaseDataAccessor realm-aware (#855)
    
    This commit makes ZkBaseDataAccessor realm-aware by building according realm-aware ZkClients in the constructor. A Builder is provided to set realm-aware client config and connection config.
---
 .../helix/manager/zk/ZkBaseDataAccessor.java       | 261 ++++++++++++++++++---
 1 file changed, 233 insertions(+), 28 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index f287c22..1d60c7b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -19,6 +19,7 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -26,16 +27,20 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixException;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.store.zk.ZNode;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
 import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
 import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
@@ -60,21 +65,23 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
   // created on SHARED mode.
   // TODO: move this to RealmAwareZkClient
   public enum ZkClientType {
-    /*
+    /**
      * When ZkBaseDataAccessor is created with the DEDICATED type, it supports ephemeral node
      * creation, callback functionality, and session management. But note that this is more
      * resource-heavy since it creates a dedicated ZK connection so should be used sparingly only
      * when the aforementioned features are needed.
      */
     DEDICATED,
-    /*
+
+    /**
      * When ZkBaseDataAccessor is created with the SHARED type, it only supports CRUD
      * functionalities. This will be the default mode of creation.
      */
     SHARED,
-    /*
+
+    /**
      * Uses FederatedZkClient (applicable on multi-realm mode only) that queries Metadata Store
-     * Directory Service for routing data
+     * Directory Service for routing data.
      */
     FEDERATED
   }
@@ -116,6 +123,13 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
   // ZkClient
   private final boolean _usesExternalZkClient;
 
+  /**
+   * @deprecated it is recommended to use the builder constructor {@link Builder}
+   * instead to avoid having to manually create and maintain a RealmAwareZkClient
+   * outside of ZkBaseDataAccessor.
+   *
+   * @param zkClient A created RealmAwareZkClient
+   */
   @Deprecated
   public ZkBaseDataAccessor(RealmAwareZkClient zkClient) {
     if (zkClient == null) {
@@ -125,13 +139,63 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     _usesExternalZkClient = true;
   }
 
+  private ZkBaseDataAccessor(Builder builder) {
+    switch (builder.realmMode) {
+      case MULTI_REALM:
+        try {
+          if (builder.zkClientType == ZkClientType.DEDICATED) {
+            // Use a realm-aware dedicated zk client
+            _zkClient = DedicatedZkClientFactory.getInstance()
+                .buildZkClient(builder.realmAwareZkConnectionConfig,
+                    builder.realmAwareZkClientConfig);
+          } else if (builder.zkClientType == ZkClientType.SHARED) {
+            // Use a realm-aware shared zk client
+            _zkClient = SharedZkClientFactory.getInstance()
+                .buildZkClient(builder.realmAwareZkConnectionConfig,
+                    builder.realmAwareZkClientConfig);
+          } else {
+            _zkClient = new FederatedZkClient(builder.realmAwareZkConnectionConfig,
+                builder.realmAwareZkClientConfig);
+          }
+        } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+          throw new HelixException("Not able to connect on multi-realm mode.", e);
+        }
+        break;
+
+      case SINGLE_REALM:
+        // Create a HelixZkClient: Use a SharedZkClient because ZkBaseDataAccessor does not need to
+        // do ephemeral operations.
+        if (builder.zkClientType == ZkClientType.DEDICATED) {
+          // If DEDICATED, then we use a dedicated HelixZkClient because we must support ephemeral
+          // operations
+          _zkClient = DedicatedZkClientFactory.getInstance()
+              .buildZkClient(new HelixZkClient.ZkConnectionConfig(builder.zkAddress),
+                  builder.realmAwareZkClientConfig.createHelixZkClientConfig());
+        } else {
+          _zkClient = SharedZkClientFactory.getInstance()
+              .buildZkClient(new HelixZkClient.ZkConnectionConfig(builder.zkAddress),
+                  builder.realmAwareZkClientConfig.createHelixZkClientConfig());
+          _zkClient
+              .waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+        }
+        break;
+      default:
+        throw new HelixException("Invalid RealmMode given: " + builder.realmMode);
+    }
+
+    _usesExternalZkClient = false;
+  }
+
   /**
    * The ZkBaseDataAccessor with custom serializer support of ZkSerializer type.
    * Note: This constructor will use a shared ZkConnection.
    * Do NOT use this for ephemeral node creation/callbacks/session management.
    * Do use this for simple CRUD operations to ZooKeeper.
    * @param zkAddress The zookeeper address
+   *
+   * @deprecated it is recommended to use the builder constructor {@link Builder}
    */
+  @Deprecated
   public ZkBaseDataAccessor(String zkAddress, ZkSerializer zkSerializer) {
     this(zkAddress, zkSerializer, ZkClientType.SHARED);
   }
@@ -142,7 +206,10 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
    * Do NOT use this for ephemeral node creation/callbacks/session management.
    * Do use this for simple CRUD operations to ZooKeeper.
    * @param zkAddress The zookeeper address
+   *
+   * @deprecated it is recommended to use the builder constructor {@link Builder}
    */
+  @Deprecated
   public ZkBaseDataAccessor(String zkAddress, PathBasedZkSerializer pathBasedZkSerializer) {
     this(zkAddress, pathBasedZkSerializer, ZkClientType.SHARED);
   }
@@ -153,7 +220,10 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
    * Does NOT support ephemeral node creation, callbacks, or session management.
    * Uses {@link ZNRecordSerializer} serializer
    * @param zkAddress The zookeeper address
+   *
+   * @deprecated it is recommended to use the builder constructor {@link Builder}
    */
+  @Deprecated
   public ZkBaseDataAccessor(String zkAddress) {
     this(zkAddress, new ZNRecordSerializer());
   }
@@ -166,7 +236,10 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
    * CRUD operations to ZooKeeper.
    * @param zkAddress
    * @param zkClientType
+   *
+   * @deprecated it is recommended to use the builder constructor {@link Builder}
    */
+  @Deprecated
   public ZkBaseDataAccessor(String zkAddress, ZkClientType zkClientType) {
     this(zkAddress, new ZNRecordSerializer(), zkClientType);
   }
@@ -179,21 +252,16 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
    * CRUD operations to ZooKeeper.
    * @param zkAddress
    * @param zkSerializer
+   *
+   * @deprecated it is recommended to use the builder constructor {@link Builder}
    */
+  @Deprecated
   public ZkBaseDataAccessor(String zkAddress, ZkSerializer zkSerializer,
       ZkClientType zkClientType) {
-    switch (zkClientType) {
-    case DEDICATED:
-      _zkClient = DedicatedZkClientFactory.getInstance().buildZkClient(
-          new HelixZkClient.ZkConnectionConfig(zkAddress),
-          new HelixZkClient.ZkClientConfig().setZkSerializer(zkSerializer));
-      break;
-    case SHARED:
-    default:
-      _zkClient = SharedZkClientFactory.getInstance().buildZkClient(
-          new HelixZkClient.ZkConnectionConfig(zkAddress),
-          new HelixZkClient.ZkClientConfig().setZkSerializer(zkSerializer));
-    }
+    RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
+        new RealmAwareZkClient.RealmAwareZkClientConfig().setZkSerializer(zkSerializer);
+
+    _zkClient = buildRealmAwareZkClient(clientConfig, zkAddress, zkClientType);
     _usesExternalZkClient = false;
   }
 
@@ -206,21 +274,16 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
    * @param zkAddress
    * @param pathBasedZkSerializer
    * @param zkClientType
+   *
+   * @deprecated it is recommended to use the builder constructor {@link Builder}
    */
+  @Deprecated
   public ZkBaseDataAccessor(String zkAddress, PathBasedZkSerializer pathBasedZkSerializer,
       ZkClientType zkClientType) {
-    switch (zkClientType) {
-    case DEDICATED:
-      _zkClient = DedicatedZkClientFactory.getInstance().buildZkClient(
-          new HelixZkClient.ZkConnectionConfig(zkAddress),
-          new HelixZkClient.ZkClientConfig().setZkSerializer(pathBasedZkSerializer));
-      break;
-    case SHARED:
-    default:
-      _zkClient = SharedZkClientFactory.getInstance().buildZkClient(
-          new HelixZkClient.ZkConnectionConfig(zkAddress),
-          new HelixZkClient.ZkClientConfig().setZkSerializer(pathBasedZkSerializer));
-    }
+    RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
+        new RealmAwareZkClient.RealmAwareZkClientConfig().setZkSerializer(pathBasedZkSerializer);
+
+    _zkClient = buildRealmAwareZkClient(clientConfig, zkAddress, zkClientType);
     _usesExternalZkClient = false;
   }
 
@@ -1256,4 +1319,146 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
       _zkClient.close();
     }
   }
+
+  // TODO: refactor Builder class to remove duplicate code with other Helix Java APIs
+  public static class Builder {
+    private String zkAddress;
+    private RealmAwareZkClient.RealmMode realmMode;
+    private ZkBaseDataAccessor.ZkClientType zkClientType;
+    private RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig;
+    private RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig;
+
+    public Builder() {
+    }
+
+    public ZkBaseDataAccessor.Builder setZkAddress(String zkAddress) {
+      this.zkAddress = zkAddress;
+      return this;
+    }
+
+    public ZkBaseDataAccessor.Builder setRealmMode(RealmAwareZkClient.RealmMode realmMode) {
+      this.realmMode = realmMode;
+      return this;
+    }
+
+    public ZkBaseDataAccessor.Builder setZkClientType(
+        ZkBaseDataAccessor.ZkClientType zkClientType) {
+      this.zkClientType = zkClientType;
+      return this;
+    }
+
+    public ZkBaseDataAccessor.Builder setRealmAwareZkConnectionConfig(
+        RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig) {
+      this.realmAwareZkConnectionConfig = realmAwareZkConnectionConfig;
+      return this;
+    }
+
+    public ZkBaseDataAccessor.Builder setRealmAwareZkClientConfig(
+        RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig) {
+      this.realmAwareZkClientConfig = realmAwareZkClientConfig;
+      return this;
+    }
+
+    /**
+     * Returns a <code>ZkBaseDataAccessor</code> instance.
+     * <p>
+     * Note: in multi-realm mode, if and only if ZK client type is set to <code>FEDERATED</code>,
+     * <code>ZkBaseDataAccessor</code> can access to multi-realm. Otherwise, it can only access to
+     * single-ream.
+     */
+    public ZkBaseDataAccessor<?> build() {
+      validate();
+      return new ZkBaseDataAccessor<>(this);
+    }
+
+    /*
+     * Validates the given parameters before building an instance of ZkBaseDataAccessor.
+     */
+    private void validate() {
+      // Resolve RealmMode based on other parameters
+      boolean isZkAddressSet = zkAddress != null && !zkAddress.isEmpty();
+      boolean isZkClientTypeSet = zkClientType != null;
+
+      // If ZkClientType is set, RealmMode must either be single-realm or not set.
+      if (isZkClientTypeSet && realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM) {
+        throw new HelixException(
+            "ZkClientType cannot be set on multi-realm mode!");
+      }
+      // If ZkClientType is not set, default to SHARED
+      if (!isZkClientTypeSet) {
+        zkClientType = ZkBaseDataAccessor.ZkClientType.SHARED;
+      }
+
+      if (realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM && !isZkAddressSet) {
+        throw new HelixException(
+            "RealmMode cannot be single-realm without a valid ZkAddress set!");
+      }
+
+      if (realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM && isZkAddressSet) {
+        throw new HelixException(
+            "ZkAddress cannot be set on multi-realm mode!");
+      }
+
+      if (realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM
+          && zkClientType == ZkClientType.FEDERATED) {
+        throw new HelixException(
+            "FederatedZkClient cannot be set on single-realm mode!");
+      }
+
+      if (realmMode == null) {
+        realmMode = isZkAddressSet ? RealmAwareZkClient.RealmMode.SINGLE_REALM
+            : RealmAwareZkClient.RealmMode.MULTI_REALM;
+      }
+
+      // Resolve RealmAwareZkClientConfig
+      if (realmAwareZkClientConfig == null) {
+        realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig();
+      }
+
+      // Resolve RealmAwareZkConnectionConfig
+      if (realmAwareZkConnectionConfig == null) {
+        // If not set, create a default one
+        realmAwareZkConnectionConfig =
+            new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build();
+      }
+    }
+  }
+
+  /*
+   * This is used for constructors that do not take a Builder in as a parameter because of
+   * keeping backward-compatibility.
+   */
+  private RealmAwareZkClient buildRealmAwareZkClient(
+      RealmAwareZkClient.RealmAwareZkClientConfig clientConfig, String zkAddress,
+      ZkClientType zkClientType) {
+    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+      try {
+        return new FederatedZkClient(
+            new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(), clientConfig);
+      } catch (IllegalStateException | IOException | InvalidRoutingDataException e) {
+        throw new HelixException("Not able to connect on multi-realm mode.", e);
+      }
+    }
+
+    RealmAwareZkClient zkClient;
+
+    switch (zkClientType) {
+      case DEDICATED:
+        zkClient = DedicatedZkClientFactory.getInstance()
+            .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
+                clientConfig.createHelixZkClientConfig());
+        break;
+      case SHARED:
+      default:
+        zkClient = SharedZkClientFactory.getInstance()
+            .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
+                clientConfig.createHelixZkClientConfig());
+
+        zkClient
+            .waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+        break;
+    }
+
+    return zkClient;
+  }
 }