You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2019/11/12 20:53:24 UTC
[helix] branch master updated: Update ZkBaseDataAccessor with
custom serializer support (#534)
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new f4ebd3b Update ZkBaseDataAccessor with custom serializer support (#534)
f4ebd3b is described below
commit f4ebd3beedba71458a6bf90eb0f619211cdb5f0a
Author: Yi Wang <i3...@gmail.com>
AuthorDate: Tue Nov 12 12:53:13 2019 -0800
Update ZkBaseDataAccessor with custom serializer support (#534)
Added a new constructor with a custom serializer as the parameter
default constructor with ZnRecordSerializer
added cross-validated unit tests to verify the custom/default serializer works correctly
---
.../java/org/apache/helix/BaseDataAccessor.java | 5 ++
.../helix/manager/zk/ZkBaseDataAccessor.java | 32 +++++++
.../helix/manager/zk/ZkCacheBaseDataAccessor.java | 7 ++
.../helix/manager/zk/TestZkBaseDataAccessor.java | 100 ++++++++++++++++++++-
.../apache/helix/mock/MockBaseDataAccessor.java | 3 +
5 files changed, 145 insertions(+), 2 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
index 7d024fa..4853195 100644
--- a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
@@ -257,4 +257,9 @@ public interface BaseDataAccessor<T> {
* reset the cache if any, when session expiry happens
*/
void reset();
+
+ /**
+ * Close the connection to the metadata store
+ */
+ void close();
}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index a4dfe21..4cd53e0 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+
import org.I0Itec.zkclient.DataUpdater;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
@@ -33,6 +34,7 @@ import org.I0Itec.zkclient.exception.ZkBadVersionException;
import org.I0Itec.zkclient.exception.ZkException;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixException;
@@ -43,6 +45,7 @@ import org.apache.helix.manager.zk.ZkAsyncCallbacks.ExistsCallbackHandler;
import org.apache.helix.manager.zk.ZkAsyncCallbacks.GetDataCallbackHandler;
import org.apache.helix.manager.zk.ZkAsyncCallbacks.SetDataCallbackHandler;
import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.apache.helix.store.zk.ZNode;
import org.apache.helix.util.HelixUtil;
import org.apache.zookeeper.CreateMode;
@@ -93,6 +96,25 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
}
/**
+ * The ZkBaseDataAccessor with custom serializer support
+ * @param zkAddress The zookeeper address
+ */
+ public ZkBaseDataAccessor(String zkAddress, ZkSerializer zkSerializer) {
+ _zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
+ new HelixZkClient.ZkClientConfig().setZkSerializer(zkSerializer));
+ }
+
+ /**
+ * The default ZkBaseDataAccessor with {@link org.apache.helix.ZNRecord} as the data model;
+ * Uses {@link ZNRecordSerializer} serializer
+ * @param zkAddress The zookeeper address
+ */
+ public ZkBaseDataAccessor(String zkAddress) {
+ this(zkAddress, new ZNRecordSerializer());
+ }
+
+ /**
* sync create
*/
@Override
@@ -1114,4 +1136,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
public void reset() {
// Nothing to do
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close() {
+ if (_zkClient != null) {
+ _zkClient.close();
+ }
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
index 67bf46e..8d27063 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
@@ -820,4 +820,11 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
_zkCache.reset();
}
}
+
+ @Override
+ public void close() {
+ if (_zkclient != null) {
+ _zkclient.close();
+ }
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
index 671ce80..a623680 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
@@ -20,9 +20,15 @@ package org.apache.helix.manager.zk;
*/
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableList;
import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.PropertyPathBuilder;
@@ -39,6 +45,24 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
public class TestZkBaseDataAccessor extends ZkUnitTestBase {
+ // serialize/deserialize integer list to byte array
+ private static final ZkSerializer LIST_SERIALIZER = new ZkSerializer() {
+ @Override
+ public byte[] serialize(Object o)
+ throws ZkMarshallingError {
+ List<Integer> list = (List<Integer>) o;
+ return list.stream().map(String::valueOf).collect(Collectors.joining(","))
+ .getBytes();
+ }
+
+ @Override
+ public Object deserialize(byte[] bytes)
+ throws ZkMarshallingError {
+ String string = new String(bytes);
+ return Arrays.stream(string.split(",")).map(Integer::valueOf)
+ .collect(Collectors.toList());
+ }
+ };
String _rootPath = TestHelper.getTestClassName();
@@ -51,8 +75,8 @@ public class TestZkBaseDataAccessor extends ZkUnitTestBase {
}
@AfterClass
- public void after() {
- int a =1;
+ public void afterClass() {
+ _gZkClient.close();
}
@Test
@@ -189,6 +213,78 @@ public class TestZkBaseDataAccessor extends ZkUnitTestBase {
}
@Test
+ public void testDefaultAccessorCreateCustomData() {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String testName = className + "_" + methodName;
+ System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+ String path = String.format("/%s/%s", _rootPath, "msg_0");
+
+ ZkBaseDataAccessor defaultAccessor = new ZkBaseDataAccessor(ZK_ADDR);
+
+ List<Integer> l0 = ImmutableList.of(1, 2, 3);
+ boolean createResult = defaultAccessor.create(path, l0, AccessOption.PERSISTENT);
+ // The result is expected to be false because the list is not ZNRecord
+ Assert.assertFalse(createResult);
+ createResult = defaultAccessor.create(path, new ZNRecord("test"), AccessOption.PERSISTENT);
+ // The result is expected to be true
+ Assert.assertTrue(createResult);
+
+ defaultAccessor.close();
+ System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testCustomAccessorCreateZnRecord() {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String testName = className + "_" + methodName;
+ System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+ String path = String.format("/%s/%s", _rootPath, "msg_0");
+
+ ZkBaseDataAccessor customDataAccessor = new ZkBaseDataAccessor(ZK_ADDR, LIST_SERIALIZER);
+ boolean createResult = customDataAccessor.create(path, new ZNRecord("test"), AccessOption.PERSISTENT);
+ // The result is expected to be false because the ZnRecord is not List
+ Assert.assertFalse(createResult);
+ createResult = customDataAccessor.create(path, ImmutableList.of(1, 2, 3), AccessOption.PERSISTENT);
+ // The result is expected to be true
+ Assert.assertTrue(createResult);
+
+ customDataAccessor.close();
+ System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testSyncCreateWithCustomSerializer() {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String testName = className + "_" + methodName;
+ System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+ String path = String.format("/%s/%s", _rootPath, "msg_0");
+
+ ZkBaseDataAccessor<List<Integer>> accessor = new ZkBaseDataAccessor<>(ZK_ADDR, LIST_SERIALIZER);
+
+ List<Integer> l0 = ImmutableList.of(1, 2, 3);
+ List<Integer> l1 = ImmutableList.of(4, 5, 6);
+ boolean createResult = accessor.create(path, l0, AccessOption.PERSISTENT);
+ Assert.assertTrue(createResult);
+
+ List<Integer> data = (List<Integer>) accessor.get(path, null, AccessOption.PERSISTENT);
+ Assert.assertEquals(data, l0);
+ boolean setResult = accessor.set(path, l1, 0, AccessOption.PERSISTENT);
+ Assert.assertTrue(setResult);
+
+ data = (List<Integer>) accessor.get(path, null, AccessOption.PERSISTENT);
+ Assert.assertEquals(data, l1);
+
+ accessor.close();
+ System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
public void testSyncUpdate() {
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java b/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
index df468b3..8403b3b 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
@@ -258,6 +258,9 @@ public class MockBaseDataAccessor implements BaseDataAccessor<ZNRecord> {
_recordMap.clear();
}
+ @Override
+ public void close() { }
+
@Override public boolean set(String path, ZNRecord record, int options, int expectVersion) {
return set(path, record, options);
}