You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/04/08 22:54:12 UTC

[helix] 42/50: Make ZkBucketDataAccessor realm-aware (#894)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch zooscalability_merge
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 71fb43622aa34d93ad0323d57dd3d00b192318b8
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Fri Mar 13 17:30:32 2020 -0700

    Make ZkBucketDataAccessor realm-aware (#894)
    
    Because Helix Controller now uses WAGED rebalancer as the default rebalancer, it tries to create an instance of ZkBucketDataAccessor. This will fail unless ZkBucketDataAccessor was also made realm-aware. We can simply use a FederatedZkClient here since BucketDataAccessor does not support ephemeral operations.
---
 .../helix/manager/zk/ZkBucketDataAccessor.java     | 24 +++++++++++++++++++---
 1 file changed, 21 insertions(+), 3 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
index ffed9f3..1ebab28 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -34,9 +34,13 @@ import org.apache.helix.AccessOption;
 import org.apache.helix.BucketDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.util.GZipCompressionUtil;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
 import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
 import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
@@ -63,7 +67,7 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
   private final int _bucketSize;
   private final long _versionTTL;
   private ZkSerializer _zkSerializer;
-  private HelixZkClient _zkClient;
+  private RealmAwareZkClient _zkClient;
   private ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
 
   /**
@@ -73,8 +77,22 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
    * @param versionTTL in ms
    */
   public ZkBucketDataAccessor(String zkAddr, int bucketSize, long versionTTL) {
-    _zkClient = DedicatedZkClientFactory.getInstance()
-        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
+    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+      try {
+        // Create realm-aware ZkClient.
+        RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
+            new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build();
+        RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
+            new RealmAwareZkClient.RealmAwareZkClientConfig();
+        _zkClient = new FederatedZkClient(connectionConfig, clientConfig);
+      } catch (IllegalArgumentException | IOException | InvalidRoutingDataException e) {
+        throw new HelixException("Not able to connect on realm-aware mode", e);
+      }
+    } else {
+      _zkClient = DedicatedZkClientFactory.getInstance()
+          .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
+    }
+
     _zkClient.setZkSerializer(new ZkSerializer() {
       @Override
       public byte[] serialize(Object data) throws ZkMarshallingError {