You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/04/08 22:54:06 UTC

[helix] 36/50: Make ZkCacheBaseDataAccessor and ZkHelixPropertyStore realm-aware (#863)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch zooscalability_merge
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 75fa3c45a10627670b60a5eaee4e146a460088ff
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Wed Mar 11 19:44:26 2020 -0700

    Make ZkCacheBaseDataAccessor and ZkHelixPropertyStore realm-aware (#863)
    
    This commit makes both ZkCacheBaseDataAccessor and ZkHelixPropertyStore realm-aware by choosing the appropriate realm-aware ZkClients in the constructor. Also, we add a Builder here to give users options to set Connection config and Client config.
    Note that ZkHelixPropertyStore extends CacheBaseDataAccessor so there is no change needed.
---
 .../helix/manager/zk/ZkBaseDataAccessor.java       |  11 +-
 .../helix/manager/zk/ZkCacheBaseDataAccessor.java  | 274 ++++++++++++++++++---
 2 files changed, 243 insertions(+), 42 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index f08ba55..f287c22 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -58,6 +58,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
 
   // Designates which mode ZkBaseDataAccessor should be created in. If not specified, it will be
   // created on SHARED mode.
+  // TODO: move this to RealmAwareZkClient
   public enum ZkClientType {
     /*
      * When ZkBaseDataAccessor is created with the DEDICATED type, it supports ephemeral node
@@ -70,7 +71,12 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
      * When ZkBaseDataAccessor is created with the SHARED type, it only supports CRUD
      * functionalities. This will be the default mode of creation.
      */
-    SHARED
+    SHARED,
+    /*
+     * Uses FederatedZkClient (applicable on multi-realm mode only) that queries Metadata Store
+     * Directory Service for routing data
+     */
+    FEDERATED
   }
 
   enum RetCode {
@@ -104,7 +110,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
   private static Logger LOG = LoggerFactory.getLogger(ZkBaseDataAccessor.class);
 
   private final RealmAwareZkClient _zkClient;
-  // true if ZkBaseDataAccessor was instantiated with a HelixZkClient, false otherwise
+
+  // true if ZkBaseDataAccessor was instantiated with a RealmAwareZkClient, false otherwise
   // This is used for close() to determine how ZkBaseDataAccessor should close the underlying
   // ZkClient
   private final boolean _usesExternalZkClient;
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
index 6d2c5cf..bd05ea7 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
@@ -19,10 +19,10 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -31,12 +31,16 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixException;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor.RetCode;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.store.HelixPropertyListener;
 import org.apache.helix.store.HelixPropertyStore;
 import org.apache.helix.store.zk.ZNode;
 import org.apache.helix.util.PathUtils;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
 import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
 import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
@@ -58,7 +62,15 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
   protected ZkCallbackCache<T> _zkCache;
 
   final ZkBaseDataAccessor<T> _baseAccessor;
-  final Map<String, Cache<T>> _cacheMap;
+
+  // TODO: need to make sure no overlap between wtCachePaths and zkCachePaths
+  // TreeMap key is ordered by key string length, so more general (i.e. short) prefix
+  // comes first
+  final Map<String, Cache<T>> _cacheMap = new TreeMap<>((o1, o2) -> {
+    int len1 = o1.split("/").length;
+    int len2 = o2.split("/").length;
+    return len1 - len2;
+  });
 
   final String _chrootPath;
   final List<String> _wtCachePaths;
@@ -70,12 +82,14 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
   private final ReentrantLock _eventLock = new ReentrantLock();
   private ZkCacheEventThread _eventThread;
 
-  private HelixZkClient _zkClient = null;
+  private RealmAwareZkClient _zkClient;
 
+  @Deprecated
   public ZkCacheBaseDataAccessor(ZkBaseDataAccessor<T> baseAccessor, List<String> wtCachePaths) {
     this(baseAccessor, null, wtCachePaths, null);
   }
 
+  @Deprecated
   public ZkCacheBaseDataAccessor(ZkBaseDataAccessor<T> baseAccessor, String chrootPath,
       List<String> wtCachePaths, List<String> zkCachePaths) {
     _baseAccessor = baseAccessor;
@@ -90,50 +104,62 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
     _wtCachePaths = wtCachePaths;
     _zkCachePaths = zkCachePaths;
 
-    // TODO: need to make sure no overlap between wtCachePaths and zkCachePaths
-    // TreeMap key is ordered by key string length, so more general (i.e. short) prefix
-    // comes first
-    _cacheMap = new TreeMap<>(new Comparator<String>() {
-      @Override
-      public int compare(String o1, String o2) {
-        int len1 = o1.split("/").length;
-        int len2 = o2.split("/").length;
-        return len1 - len2;
-      }
-    });
-
     start();
   }
 
+  @Deprecated
   public ZkCacheBaseDataAccessor(String zkAddress, ZkSerializer serializer, String chrootPath,
       List<String> wtCachePaths, List<String> zkCachePaths) {
     this(zkAddress, serializer, chrootPath, wtCachePaths, zkCachePaths, null, null,
         ZkBaseDataAccessor.ZkClientType.SHARED);
   }
 
+  @Deprecated
   public ZkCacheBaseDataAccessor(String zkAddress, ZkSerializer serializer, String chrootPath,
       List<String> wtCachePaths, List<String> zkCachePaths, String monitorType, String monitorkey) {
     this(zkAddress, serializer, chrootPath, wtCachePaths, zkCachePaths, monitorType, monitorkey,
         ZkBaseDataAccessor.ZkClientType.SHARED);
   }
 
+  @Deprecated
   public ZkCacheBaseDataAccessor(String zkAddress, ZkSerializer serializer, String chrootPath,
       List<String> wtCachePaths, List<String> zkCachePaths, String monitorType, String monitorkey,
       ZkBaseDataAccessor.ZkClientType zkClientType) {
-    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
-    clientConfig.setZkSerializer(serializer).setMonitorType(monitorType).setMonitorKey(monitorkey);
-    switch (zkClientType) {
-    case DEDICATED:
-      _zkClient = DedicatedZkClientFactory.getInstance().buildZkClient(
-          new HelixZkClient.ZkConnectionConfig(zkAddress),
-          new HelixZkClient.ZkClientConfig().setZkSerializer(serializer));
-      break;
-    case SHARED:
-    default:
-      _zkClient = SharedZkClientFactory.getInstance()
-          .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), clientConfig);
-    }
-    _zkClient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+
+    // If the multi ZK config is enabled, use multi-realm mode with FederatedZkClient
+    if (Boolean.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
+      try {
+        RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
+            new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
+        RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
+            new RealmAwareZkClient.RealmAwareZkClientConfig();
+        clientConfig.setZkSerializer(serializer).setMonitorType(monitorType)
+            .setMonitorKey(monitorkey);
+        // Use a federated zk client
+        _zkClient = new FederatedZkClient(connectionConfigBuilder.build(), clientConfig);
+      } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+        // Note: IllegalStateException is for HttpRoutingDataReader if MSDS endpoint cannot be
+        // found
+        throw new HelixException("Failed to create ZkCacheBaseDataAccessor!", e);
+      }
+    } else {
+      HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+      clientConfig.setZkSerializer(serializer).setMonitorType(monitorType)
+          .setMonitorKey(monitorkey);
+      switch (zkClientType) {
+        case DEDICATED:
+          _zkClient = DedicatedZkClientFactory.getInstance()
+              .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
+                  new HelixZkClient.ZkClientConfig().setZkSerializer(serializer));
+          break;
+        case SHARED:
+        default:
+          _zkClient = SharedZkClientFactory.getInstance()
+              .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), clientConfig);
+      }
+      _zkClient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+    }
+
     _baseAccessor = new ZkBaseDataAccessor<>(_zkClient);
 
     if (chrootPath == null || chrootPath.equals("/")) {
@@ -146,17 +172,67 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
     _wtCachePaths = wtCachePaths;
     _zkCachePaths = zkCachePaths;
 
-    // TODO: need to make sure no overlap between wtCachePaths and zkCachePaths
-    // TreeMap key is ordered by key string length, so more general (i.e. short) prefix
-    // comes first
-    _cacheMap = new TreeMap<>(new Comparator<String>() {
-      @Override
-      public int compare(String o1, String o2) {
-        int len1 = o1.split("/").length;
-        int len2 = o2.split("/").length;
-        return len1 - len2;
-      }
-    });
+    start();
+  }
+
+  /**
+   * Constructor using a Builder that allows users to set connection and client configs.
+   * @param builder
+   */
+  private ZkCacheBaseDataAccessor(Builder builder) {
+    _chrootPath = builder._chrootPath;
+    _wtCachePaths = builder._wtCachePaths;
+    _zkCachePaths = builder._zkCachePaths;
+
+    RealmAwareZkClient zkClient;
+    switch (builder._realmMode) {
+      case MULTI_REALM:
+        try {
+          if (builder._zkClientType == ZkBaseDataAccessor.ZkClientType.DEDICATED) {
+            // Use a realm-aware dedicated zk client
+            zkClient = DedicatedZkClientFactory.getInstance()
+                .buildZkClient(builder._realmAwareZkConnectionConfig,
+                    builder._realmAwareZkClientConfig);
+          } else if (builder._zkClientType == ZkBaseDataAccessor.ZkClientType.SHARED) {
+            // Use a realm-aware shared zk client
+            zkClient = SharedZkClientFactory.getInstance()
+                .buildZkClient(builder._realmAwareZkConnectionConfig,
+                    builder._realmAwareZkClientConfig);
+          } else {
+            // Use a federated zk client
+            zkClient = new FederatedZkClient(builder._realmAwareZkConnectionConfig,
+                builder._realmAwareZkClientConfig);
+          }
+          break; // Must break out of the switch statement here since zkClient has been created
+        } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+          // Note: IllegalStateException is for HttpRoutingDataReader if MSDS endpoint cannot be
+          // found
+          throw new HelixException("Failed to create ZkCacheBaseDataAccessor!", e);
+        }
+      case SINGLE_REALM:
+        switch (builder._zkClientType) {
+          case DEDICATED:
+            // If DEDICATED, then we use a dedicated HelixZkClient because we must support ephemeral
+            // operations
+            zkClient = DedicatedZkClientFactory.getInstance()
+                .buildZkClient(new HelixZkClient.ZkConnectionConfig(builder._zkAddress),
+                    builder._realmAwareZkClientConfig.createHelixZkClientConfig());
+            break;
+          case SHARED:
+          default:
+            zkClient = SharedZkClientFactory.getInstance()
+                .buildZkClient(new HelixZkClient.ZkConnectionConfig(builder._zkAddress),
+                    builder._realmAwareZkClientConfig.createHelixZkClientConfig());
+            zkClient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT,
+                TimeUnit.MILLISECONDS);
+            break;
+        }
+      default:
+        throw new HelixException("Invalid RealmMode given: " + builder._realmMode);
+    }
+
+    _zkClient = zkClient;
+    _baseAccessor = new ZkBaseDataAccessor<>(_zkClient);
 
     start();
   }
@@ -842,4 +918,122 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
       _zkClient.close();
     }
   }
+
+  public static class Builder {
+    private String _zkAddress;
+    private RealmAwareZkClient.RealmMode _realmMode;
+    private RealmAwareZkClient.RealmAwareZkConnectionConfig _realmAwareZkConnectionConfig;
+    private RealmAwareZkClient.RealmAwareZkClientConfig _realmAwareZkClientConfig;
+
+    /** ZkCacheBaseDataAccessor-specific parameters */
+    private String _chrootPath;
+    private List<String> _wtCachePaths;
+    private List<String> _zkCachePaths;
+    private ZkBaseDataAccessor.ZkClientType _zkClientType;
+
+    public Builder() {
+    }
+
+    public Builder setZkAddress(String zkAddress) {
+      _zkAddress = zkAddress;
+      return this;
+    }
+
+    public Builder setRealmMode(RealmAwareZkClient.RealmMode realmMode) {
+      _realmMode = realmMode;
+      return this;
+    }
+
+    public Builder setRealmAwareZkConnectionConfig(
+        RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig) {
+      _realmAwareZkConnectionConfig = realmAwareZkConnectionConfig;
+      return this;
+    }
+
+    public Builder setRealmAwareZkClientConfig(
+        RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig) {
+      _realmAwareZkClientConfig = realmAwareZkClientConfig;
+      return this;
+    }
+
+    public Builder setChrootPath(String chrootPath) {
+      _chrootPath = chrootPath;
+      return this;
+    }
+
+    public Builder setWtCachePaths(List<String> wtCachePaths) {
+      _wtCachePaths = wtCachePaths;
+      return this;
+    }
+
+    public Builder setZkCachePaths(List<String> zkCachePaths) {
+      _zkCachePaths = zkCachePaths;
+      return this;
+    }
+
+    /**
+     * Sets the ZkClientType. If this is set, ZkCacheBaseDataAccessor will be created on
+     * single-realm mode.
+     * @param zkClientType
+     * @return
+     */
+    public Builder setZkClientType(ZkBaseDataAccessor.ZkClientType zkClientType) {
+      _zkClientType = zkClientType;
+      return this;
+    }
+
+    public ZkCacheBaseDataAccessor build() {
+      validate();
+      return new ZkCacheBaseDataAccessor(this);
+    }
+
+    private void validate() {
+      // Resolve RealmMode based on other parameters
+      boolean isZkAddressSet = _zkAddress != null && !_zkAddress.isEmpty();
+      boolean isZkClientTypeSet = _zkClientType != null;
+
+      // If ZkClientType is set, RealmMode must either be single-realm or not set.
+      if (isZkClientTypeSet && _realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM) {
+        throw new HelixException(
+            "ZkCacheBaseDataAccessor: you cannot set ZkClientType on multi-realm mode!");
+      }
+      // If ZkClientType is not set, default to SHARED
+      if (!isZkClientTypeSet) {
+        _zkClientType = ZkBaseDataAccessor.ZkClientType.SHARED;
+      }
+
+      if (_realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM && !isZkAddressSet) {
+        throw new HelixException(
+            "ZkCacheBaseDataAccessor: RealmMode cannot be single-realm without a valid ZkAddress set!");
+      }
+
+      if (_realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM && isZkAddressSet) {
+        throw new HelixException(
+            "ZkCacheBaseDataAccessor: You cannot set the ZkAddress on multi-realm mode!");
+      }
+
+      if (_realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM
+          && _zkClientType == ZkBaseDataAccessor.ZkClientType.FEDERATED) {
+        throw new HelixException(
+            "ZkCacheBaseDataAccessor: You cannot use FederatedZkClient on single-realm mode!");
+      }
+
+      if (_realmMode == null) {
+        _realmMode = isZkAddressSet ? RealmAwareZkClient.RealmMode.SINGLE_REALM
+            : RealmAwareZkClient.RealmMode.MULTI_REALM;
+      }
+
+      // Resolve RealmAwareZkClientConfig
+      if (_realmAwareZkClientConfig == null) {
+        _realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig();
+      }
+
+      // Resolve RealmAwareZkConnectionConfig
+      if (_realmAwareZkConnectionConfig == null) {
+        // If not set, create a default one
+        _realmAwareZkConnectionConfig =
+            new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build();
+      }
+    }
+  }
 }