You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2019/11/12 20:53:24 UTC

[helix] branch master updated: Update ZkBaseDataAccessor with custom serializer support (#534)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new f4ebd3b  Update ZkBaseDataAccessor with custom serializer support (#534)
f4ebd3b is described below

commit f4ebd3beedba71458a6bf90eb0f619211cdb5f0a
Author: Yi Wang <i3...@gmail.com>
AuthorDate: Tue Nov 12 12:53:13 2019 -0800

    Update ZkBaseDataAccessor with custom serializer support (#534)
    
    Added a new constructor with a custom serializer as the parameter
    default constructor with ZnRecordSerializer
    added cross-validated unit tests to verify the custom/default serializer works correctly
---
 .../java/org/apache/helix/BaseDataAccessor.java    |   5 ++
 .../helix/manager/zk/ZkBaseDataAccessor.java       |  32 +++++++
 .../helix/manager/zk/ZkCacheBaseDataAccessor.java  |   7 ++
 .../helix/manager/zk/TestZkBaseDataAccessor.java   | 100 ++++++++++++++++++++-
 .../apache/helix/mock/MockBaseDataAccessor.java    |   3 +
 5 files changed, 145 insertions(+), 2 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
index 7d024fa..4853195 100644
--- a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
@@ -257,4 +257,9 @@ public interface BaseDataAccessor<T> {
    * reset the cache if any, when session expiry happens
    */
   void reset();
+
+  /**
+   * Close the connection to the metadata store
+   */
+  void close();
 }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index a4dfe21..4cd53e0 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+
 import org.I0Itec.zkclient.DataUpdater;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
@@ -33,6 +34,7 @@ import org.I0Itec.zkclient.exception.ZkBadVersionException;
 import org.I0Itec.zkclient.exception.ZkException;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixException;
@@ -43,6 +45,7 @@ import org.apache.helix.manager.zk.ZkAsyncCallbacks.ExistsCallbackHandler;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.GetDataCallbackHandler;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.SetDataCallbackHandler;
 import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.store.zk.ZNode;
 import org.apache.helix.util.HelixUtil;
 import org.apache.zookeeper.CreateMode;
@@ -93,6 +96,25 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
   }
 
   /**
+   * The ZkBaseDataAccessor with custom serializer support
+   * @param zkAddress The zookeeper address
+   */
+  public ZkBaseDataAccessor(String zkAddress, ZkSerializer zkSerializer) {
+    _zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
+            new HelixZkClient.ZkClientConfig().setZkSerializer(zkSerializer));
+  }
+
+  /**
+   * The default ZkBaseDataAccessor with {@link org.apache.helix.ZNRecord} as the data model;
+   * Uses {@link ZNRecordSerializer} serializer
+   * @param zkAddress The zookeeper address
+   */
+  public ZkBaseDataAccessor(String zkAddress) {
+    this(zkAddress, new ZNRecordSerializer());
+  }
+
+  /**
    * sync create
    */
   @Override
@@ -1114,4 +1136,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
   public void reset() {
     // Nothing to do
   }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void close() {
+    if (_zkClient != null) {
+      _zkClient.close();
+    }
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
index 67bf46e..8d27063 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
@@ -820,4 +820,11 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
       _zkCache.reset();
     }
   }
+
+  @Override
+  public void close() {
+    if (_zkclient != null) {
+      _zkclient.close();
+    }
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
index 671ce80..a623680 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
@@ -20,9 +20,15 @@ package org.apache.helix.manager.zk;
  */
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableList;
 import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.PropertyPathBuilder;
@@ -39,6 +45,24 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
 public class TestZkBaseDataAccessor extends ZkUnitTestBase {
+  // serialize/deserialize integer list to byte array
+  private static final ZkSerializer LIST_SERIALIZER = new ZkSerializer() {
+    @Override
+    public byte[] serialize(Object o)
+        throws ZkMarshallingError {
+      List<Integer> list = (List<Integer>) o;
+      return list.stream().map(String::valueOf).collect(Collectors.joining(","))
+          .getBytes();
+    }
+
+    @Override
+    public Object deserialize(byte[] bytes)
+        throws ZkMarshallingError {
+      String string = new String(bytes);
+      return Arrays.stream(string.split(",")).map(Integer::valueOf)
+          .collect(Collectors.toList());
+    }
+  };
   String _rootPath = TestHelper.getTestClassName();
 
 
@@ -51,8 +75,8 @@ public class TestZkBaseDataAccessor extends ZkUnitTestBase {
   }
 
   @AfterClass
-  public void after() {
-    int a =1;
+  public void afterClass() {
+    _gZkClient.close();
   }
 
   @Test
@@ -189,6 +213,78 @@ public class TestZkBaseDataAccessor extends ZkUnitTestBase {
   }
 
   @Test
+  public void testDefaultAccessorCreateCustomData() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+    String path = String.format("/%s/%s", _rootPath, "msg_0");
+
+    ZkBaseDataAccessor defaultAccessor = new ZkBaseDataAccessor(ZK_ADDR);
+
+    List<Integer> l0 = ImmutableList.of(1, 2, 3);
+    boolean createResult = defaultAccessor.create(path, l0, AccessOption.PERSISTENT);
+    // The result is expected to be false because the list is not ZNRecord
+    Assert.assertFalse(createResult);
+    createResult = defaultAccessor.create(path, new ZNRecord("test"), AccessOption.PERSISTENT);
+    // The result is expected to be true
+    Assert.assertTrue(createResult);
+
+    defaultAccessor.close();
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testCustomAccessorCreateZnRecord() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+    String path = String.format("/%s/%s", _rootPath, "msg_0");
+
+    ZkBaseDataAccessor customDataAccessor = new ZkBaseDataAccessor(ZK_ADDR, LIST_SERIALIZER);
+    boolean createResult = customDataAccessor.create(path, new ZNRecord("test"), AccessOption.PERSISTENT);
+    // The result is expected to be false because the ZnRecord is not List
+    Assert.assertFalse(createResult);
+    createResult = customDataAccessor.create(path, ImmutableList.of(1, 2, 3), AccessOption.PERSISTENT);
+    // The result is expected to be true
+    Assert.assertTrue(createResult);
+
+    customDataAccessor.close();
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testSyncCreateWithCustomSerializer() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+    String path = String.format("/%s/%s", _rootPath, "msg_0");
+
+    ZkBaseDataAccessor<List<Integer>> accessor = new ZkBaseDataAccessor<>(ZK_ADDR, LIST_SERIALIZER);
+
+    List<Integer> l0 = ImmutableList.of(1, 2, 3);
+    List<Integer> l1 = ImmutableList.of(4, 5, 6);
+    boolean createResult = accessor.create(path, l0, AccessOption.PERSISTENT);
+    Assert.assertTrue(createResult);
+
+    List<Integer> data = (List<Integer>) accessor.get(path, null, AccessOption.PERSISTENT);
+    Assert.assertEquals(data, l0);
+    boolean setResult = accessor.set(path, l1, 0, AccessOption.PERSISTENT);
+    Assert.assertTrue(setResult);
+
+    data = (List<Integer>) accessor.get(path, null, AccessOption.PERSISTENT);
+    Assert.assertEquals(data, l1);
+
+    accessor.close();
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
   public void testSyncUpdate() {
     String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java b/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
index df468b3..8403b3b 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
@@ -258,6 +258,9 @@ public class MockBaseDataAccessor implements BaseDataAccessor<ZNRecord> {
     _recordMap.clear();
   }
 
+  @Override
+  public void close() { }
+
   @Override public boolean set(String path, ZNRecord record, int options, int expectVersion) {
     return set(path, record, options);
   }