You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2019/12/13 00:24:35 UTC

[helix] branch master updated: Add an option for using dedicated ZkConnection in ZkBaseDataAccessor (#655)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 2416922  Add an option for using dedicated ZkConnection in ZkBaseDataAccessor (#655)
2416922 is described below

commit 2416922793e0138ef054186710714853db2ddb4e
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Thu Dec 12 16:24:26 2019 -0800

    Add an option for using dedicated ZkConnection in ZkBaseDataAccessor (#655)
    
    There are usage patterns where a user needs to create ephemeral nodes using a ZkBaseDataAccessor. We need to support this use case since we want users to avoid using ZkClient directly, so we do it in ZkBaseDataAccessor, which is Helix's wrapper API around the raw ZkClient.
    
    The default behavior would be to use a shared ZK connection resource.
---
 .../helix/manager/zk/ZkBaseDataAccessor.java       | 107 +++++++++++++++++++--
 .../helix/manager/zk/ZkCacheBaseDataAccessor.java  |  29 ++++--
 .../helix/store/zk/ZkHelixPropertyStore.java       |   5 +
 3 files changed, 124 insertions(+), 17 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index fb2749e..c1abccb 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -44,6 +44,7 @@ import org.apache.helix.manager.zk.ZkAsyncCallbacks.DeleteCallbackHandler;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.ExistsCallbackHandler;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.GetDataCallbackHandler;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.SetDataCallbackHandler;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
 import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.store.zk.ZNode;
@@ -56,6 +57,24 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
+
+  // Designates which mode ZkBaseDataAccessor should be created in. If not specified, it will be
+  // created on SHARED mode.
+  public enum ZkClientType {
+    /*
+     * When ZkBaseDataAccessor is created with the DEDICATED type, it supports ephemeral node
+     * creation, callback functionality, and session management. But note that this is more
+     * resource-heavy since it creates a dedicated ZK connection so should be used sparingly only
+     * when the aforementioned features are needed.
+     */
+    DEDICATED,
+    /*
+     * When ZkBaseDataAccessor is created with the SHARED type, it only supports CRUD
+     * functionalities. This will be the default mode of creation.
+     */
+    SHARED
+  }
+
   enum RetCode {
     OK,
     NODE_EXISTS,
@@ -103,28 +122,30 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
 
   /**
    * The ZkBaseDataAccessor with custom serializer support of ZkSerializer type.
+   * Note: This constructor will use a shared ZkConnection.
+   * Do NOT use this for ephemeral node creation/callbacks/session management.
+   * Do use this for simple CRUD operations to ZooKeeper.
    * @param zkAddress The zookeeper address
    */
   public ZkBaseDataAccessor(String zkAddress, ZkSerializer zkSerializer) {
-    _zkClient = SharedZkClientFactory.getInstance().buildZkClient(
-        new HelixZkClient.ZkConnectionConfig(zkAddress),
-        new HelixZkClient.ZkClientConfig().setZkSerializer(zkSerializer));
-    _usesExternalZkClient = false;
+    this(zkAddress, zkSerializer, ZkClientType.SHARED);
   }
 
   /**
    * The ZkBaseDataAccessor with custom serializer support of PathBasedZkSerializer type.
+   * Note: This constructor will use a shared ZkConnection.
+   * Do NOT use this for ephemeral node creation/callbacks/session management.
+   * Do use this for simple CRUD operations to ZooKeeper.
    * @param zkAddress The zookeeper address
    */
-  public ZkBaseDataAccessor(String zkAddress, PathBasedZkSerializer zkSerializer) {
-    _zkClient = SharedZkClientFactory.getInstance().buildZkClient(
-        new HelixZkClient.ZkConnectionConfig(zkAddress),
-        new HelixZkClient.ZkClientConfig().setZkSerializer(zkSerializer));
-    _usesExternalZkClient = false;
+  public ZkBaseDataAccessor(String zkAddress, PathBasedZkSerializer pathBasedZkSerializer) {
+    this(zkAddress, pathBasedZkSerializer, ZkClientType.SHARED);
   }
 
   /**
-   * The default ZkBaseDataAccessor with {@link org.apache.helix.ZNRecord} as the data model;
+   * Creates a ZkBaseDataAccessor with {@link org.apache.helix.ZNRecord} as the data model.
+   * Uses a shared ZkConnection resource.
+   * Does NOT support ephemeral node creation, callbacks, or session management.
    * Uses {@link ZNRecordSerializer} serializer
    * @param zkAddress The zookeeper address
    */
@@ -133,6 +154,72 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
   }
 
   /**
+   * Creates a ZkBaseDataAccessor with {@link org.apache.helix.ZNRecord} as the data model.
+   * If DEDICATED, it will use a dedicated ZkConnection, which allows ephemeral
+   * node creation, callbacks, and session management.
+   * If SHARED, it will use a shared ZkConnection, which only allows simple
+   * CRUD operations to ZooKeeper.
+   * @param zkAddress
+   * @param zkClientType
+   */
+  public ZkBaseDataAccessor(String zkAddress, ZkClientType zkClientType) {
+    this(zkAddress, new ZNRecordSerializer(), zkClientType);
+  }
+
+  /**
+   * Creates a ZkBaseDataAccessor with a custom implementation of ZkSerializer.
+   * If DEDICATED, it will use a dedicated ZkConnection, which allows ephemeral
+   * node creation, callbacks, and session management.
+   * If SHARED, it will use a shared ZkConnection, which only allows simple
+   * CRUD operations to ZooKeeper.
+   * @param zkAddress
+   * @param zkSerializer
+   */
+  public ZkBaseDataAccessor(String zkAddress, ZkSerializer zkSerializer,
+      ZkClientType zkClientType) {
+    switch (zkClientType) {
+    case DEDICATED:
+      _zkClient = DedicatedZkClientFactory.getInstance().buildZkClient(
+          new HelixZkClient.ZkConnectionConfig(zkAddress),
+          new HelixZkClient.ZkClientConfig().setZkSerializer(zkSerializer));
+      break;
+    case SHARED:
+    default:
+      _zkClient = SharedZkClientFactory.getInstance().buildZkClient(
+          new HelixZkClient.ZkConnectionConfig(zkAddress),
+          new HelixZkClient.ZkClientConfig().setZkSerializer(zkSerializer));
+    }
+    _usesExternalZkClient = false;
+  }
+
+  /**
+   * Creates a ZkBaseDataAccessor with a custom implementation of PathBasedZkSerializer.
+   * If created with DEDICATED mode, it will use a dedicated ZkConnection, which allows ephemeral
+   * node creation, callbacks, and session management.
+   * If SHARED, it will use a shared ZkConnection, which only allows simple
+   * CRUD operations to ZooKeeper.
+   * @param zkAddress
+   * @param pathBasedZkSerializer
+   * @param zkClientType
+   */
+  public ZkBaseDataAccessor(String zkAddress, PathBasedZkSerializer pathBasedZkSerializer,
+      ZkClientType zkClientType) {
+    switch (zkClientType) {
+    case DEDICATED:
+      _zkClient = DedicatedZkClientFactory.getInstance().buildZkClient(
+          new HelixZkClient.ZkConnectionConfig(zkAddress),
+          new HelixZkClient.ZkClientConfig().setZkSerializer(pathBasedZkSerializer));
+      break;
+    case SHARED:
+    default:
+      _zkClient = SharedZkClientFactory.getInstance().buildZkClient(
+          new HelixZkClient.ZkConnectionConfig(zkAddress),
+          new HelixZkClient.ZkClientConfig().setZkSerializer(pathBasedZkSerializer));
+    }
+    _usesExternalZkClient = false;
+  }
+
+  /**
    * sync create
    */
   @Override
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
index ce4fc94..b230827 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
@@ -38,6 +38,7 @@ import org.apache.helix.AccessOption;
 import org.apache.helix.HelixException;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor.RetCode;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
 import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.store.HelixPropertyListener;
@@ -106,18 +107,32 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
 
   public ZkCacheBaseDataAccessor(String zkAddress, ZkSerializer serializer, String chrootPath,
       List<String> wtCachePaths, List<String> zkCachePaths) {
-    this(zkAddress, serializer, chrootPath, wtCachePaths, zkCachePaths, null, null);
+    this(zkAddress, serializer, chrootPath, wtCachePaths, zkCachePaths, null, null,
+        ZkBaseDataAccessor.ZkClientType.SHARED);
   }
 
   public ZkCacheBaseDataAccessor(String zkAddress, ZkSerializer serializer, String chrootPath,
       List<String> wtCachePaths, List<String> zkCachePaths, String monitorType, String monitorkey) {
-    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
-    clientConfig.setZkSerializer(serializer)
-        .setMonitorType(monitorType)
-        .setMonitorKey(monitorkey);
-    _zkClient = SharedZkClientFactory.getInstance()
-        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), clientConfig);
+    this(zkAddress, serializer, chrootPath, wtCachePaths, zkCachePaths, monitorType, monitorkey,
+        ZkBaseDataAccessor.ZkClientType.SHARED);
+  }
 
+  public ZkCacheBaseDataAccessor(String zkAddress, ZkSerializer serializer, String chrootPath,
+      List<String> wtCachePaths, List<String> zkCachePaths, String monitorType, String monitorkey,
+      ZkBaseDataAccessor.ZkClientType zkClientType) {
+    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+    clientConfig.setZkSerializer(serializer).setMonitorType(monitorType).setMonitorKey(monitorkey);
+    switch (zkClientType) {
+    case DEDICATED:
+      _zkClient = DedicatedZkClientFactory.getInstance().buildZkClient(
+          new HelixZkClient.ZkConnectionConfig(zkAddress),
+          new HelixZkClient.ZkClientConfig().setZkSerializer(serializer));
+      break;
+    case SHARED:
+    default:
+      _zkClient = SharedZkClientFactory.getInstance()
+          .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), clientConfig);
+    }
     _zkClient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
     _baseAccessor = new ZkBaseDataAccessor<>(_zkClient);
 
diff --git a/helix-core/src/main/java/org/apache/helix/store/zk/ZkHelixPropertyStore.java b/helix-core/src/main/java/org/apache/helix/store/zk/ZkHelixPropertyStore.java
index 0e91314..a6e7a9c 100644
--- a/helix-core/src/main/java/org/apache/helix/store/zk/ZkHelixPropertyStore.java
+++ b/helix-core/src/main/java/org/apache/helix/store/zk/ZkHelixPropertyStore.java
@@ -41,4 +41,9 @@ public class ZkHelixPropertyStore<T> extends ZkCacheBaseDataAccessor<T> {
   public ZkHelixPropertyStore(String zkAddress, ZkSerializer serializer, String chrootPath) {
     super(zkAddress, serializer, chrootPath, null, null, MONITOR_TYPE, chrootPath);
   }
+
+  public ZkHelixPropertyStore(String zkAddress, ZkSerializer serializer, String chrootPath,
+      ZkBaseDataAccessor.ZkClientType zkClientType) {
+    super(zkAddress, serializer, chrootPath, null, null, MONITOR_TYPE, chrootPath, zkClientType);
+  }
 }