You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/28 22:33:18 UTC
[helix] 40/50: Increase parallelism for ZkBucketDataAccessor (#506)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 48d272cc84f62a995884a77ef6e9b101daf62256
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Fri Oct 18 18:41:29 2019 -0700
Increase parallelism for ZkBucketDataAccessor (#506)
* Increase parallelism for ZkBucketDataAccessor
This diff improves parallelism and throughput for ZkBucketDataAccessor. It implements the following ideas:
1. Optimistic Concurrency Control
2. Monotonically Increasing Version Number
3. Garbage Collection of Stale Metadata
4. Retrying Reads Upon Failure
---
.../helix/manager/zk/ZkBucketDataAccessor.java | 431 ++++++++++++---------
.../helix/manager/zk/TestZkBucketDataAccessor.java | 174 ++++-----
2 files changed, 307 insertions(+), 298 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
index a11da29..bc13471 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -19,56 +19,60 @@ package org.apache.helix.manager.zk;
* under the License.
*/
+import com.google.common.collect.ImmutableMap;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.I0Itec.zkclient.DataUpdater;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.helix.AccessOption;
-import org.apache.helix.BaseDataAccessor;
import org.apache.helix.BucketDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
import org.apache.helix.manager.zk.client.HelixZkClient;
-import org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.apache.helix.util.GZipCompressionUtil;
+import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
- private static Logger LOG = LoggerFactory.getLogger(ZkBucketDataAccessor.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ZkBucketDataAccessor.class);
- private static final int DEFAULT_NUM_VERSIONS = 2;
+ private static final int DEFAULT_BUCKET_SIZE = 50 * 1024; // 50KB
+ private static final long DEFAULT_VERSION_TTL = TimeUnit.MINUTES.toMillis(1L); // 1 min
private static final String BUCKET_SIZE_KEY = "BUCKET_SIZE";
private static final String DATA_SIZE_KEY = "DATA_SIZE";
- private static final String WRITE_LOCK_KEY = "WRITE_LOCK";
- private static final String LAST_SUCCESS_KEY = "LAST_SUCCESS";
+ private static final String METADATA_KEY = "METADATA";
+ private static final String LAST_SUCCESSFUL_WRITE_KEY = "LAST_SUCCESSFUL_WRITE";
+ private static final String LAST_WRITE_KEY = "LAST_WRITE";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ // Thread pool for deleting stale versions
+ private static final ScheduledExecutorService GC_THREAD = Executors.newScheduledThreadPool(1);
- // 100 KB for default bucket size
- private static final int DEFAULT_BUCKET_SIZE = 50 * 1024;
private final int _bucketSize;
- private final int _numVersions;
+ private final long _versionTTL;
private ZkSerializer _zkSerializer;
private HelixZkClient _zkClient;
- private HelixZkClient _znRecordClient;
- private BaseDataAccessor _zkBaseDataAccessor;
- private BaseDataAccessor<ZNRecord> _znRecordBaseDataAccessor;
+ private ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
/**
* Constructor that allows a custom bucket size.
* @param zkAddr
* @param bucketSize
- * @param numVersions number of versions to store in ZK
+ * @param versionTTL in ms
*/
- public ZkBucketDataAccessor(String zkAddr, int bucketSize, int numVersions) {
- // There are two HelixZkClients:
- // 1. _zkBaseDataAccessor for writes of binary data
- // 2. _znRecordBaseDataAccessor for writes of ZNRecord (metadata)
+ public ZkBucketDataAccessor(String zkAddr, int bucketSize, long versionTTL) {
_zkClient = DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
_zkClient.setZkSerializer(new ZkSerializer() {
@@ -85,20 +89,10 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
return data;
}
});
- _zkBaseDataAccessor = new ZkBaseDataAccessor(_zkClient);
-
- // TODO: Optimize serialization with Jackson
- // TODO: Or use a better binary serialization protocol
- // TODO: Consider making this also binary
- // TODO: Consider an async write for the metadata as well
- _znRecordClient = SharedZkClientFactory.getInstance()
- .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
- _znRecordBaseDataAccessor = new ZkBaseDataAccessor<>(_znRecordClient);
- _znRecordClient.setZkSerializer(new ZNRecordSerializer());
-
+ _zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
_zkSerializer = new ZNRecordJacksonSerializer();
_bucketSize = bucketSize;
- _numVersions = numVersions;
+ _versionTTL = versionTTL;
}
/**
@@ -106,82 +100,109 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
* @param zkAddr
*/
public ZkBucketDataAccessor(String zkAddr) {
- this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_NUM_VERSIONS);
+ this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL);
}
@Override
- public <T extends HelixProperty> boolean compressedBucketWrite(String path, T value)
+ public <T extends HelixProperty> boolean compressedBucketWrite(String rootPath, T value)
throws IOException {
- // Take the ZNrecord and serialize it (get byte[])
+ DataUpdater<byte[]> lastWriteVersionUpdater = dataInZk -> {
+ if (dataInZk == null || dataInZk.length == 0) {
+ // No last write version exists, so start with 0
+ return "0".getBytes();
+ }
+ // Last write exists, so increment and write it back
+ // **String conversion is necessary to make it display in ZK (zooinspector)**
+ String lastWriteVersionStr = new String(dataInZk);
+ long lastWriteVersion = Long.parseLong(lastWriteVersionStr);
+ lastWriteVersion++;
+ return String.valueOf(lastWriteVersion).getBytes();
+ };
+
+ // 1. Increment lastWriteVersion using DataUpdater
+ ZkBaseDataAccessor.AccessResult result = _zkBaseDataAccessor.doUpdate(
+ rootPath + "/" + LAST_WRITE_KEY, lastWriteVersionUpdater, AccessOption.PERSISTENT);
+ if (result._retCode != ZkBaseDataAccessor.RetCode.OK) {
+ throw new HelixException(
+ String.format("Failed to write the write version at path: %s!", rootPath));
+ }
+
+ // Successfully reserved a version number
+ byte[] binaryVersion = (byte[]) result._updatedValue;
+ String versionStr = new String(binaryVersion);
+ final long version = Long.parseLong(versionStr);
+
+ // 2. Write to the incremented last write version
+ String versionedDataPath = rootPath + "/" + versionStr;
+
+ // Take the ZNRecord and serialize it (get byte[])
byte[] serializedRecord = _zkSerializer.serialize(value.getRecord());
// Compress the byte[]
byte[] compressedRecord = GZipCompressionUtil.compress(serializedRecord);
// Compute N - number of buckets
int numBuckets = (compressedRecord.length + _bucketSize - 1) / _bucketSize;
- if (tryLock(path)) {
- try {
- // Read or initialize metadata and compute the last success version index
- ZNRecord metadataRecord =
- _znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
- if (metadataRecord == null) {
- metadataRecord = new ZNRecord(extractIdFromPath(path));
- }
- int lastSuccessIndex =
- (metadataRecord.getIntField(LAST_SUCCESS_KEY, -1) + 1) % _numVersions;
- String dataPath = path + "/" + lastSuccessIndex;
-
- List<String> paths = new ArrayList<>();
- List<Object> buckets = new ArrayList<>();
-
- int ptr = 0;
- int counter = 0;
- while (counter < numBuckets) {
- paths.add(dataPath + "/" + counter);
- if (counter == numBuckets - 1) {
- // Special treatment for the last bucket
- buckets.add(Arrays.copyOfRange(compressedRecord, ptr,
- ptr + compressedRecord.length % _bucketSize));
- } else {
- buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + _bucketSize));
- }
- ptr += _bucketSize;
- counter++;
- }
-
- // Do a cleanup of previous data
- if (!_zkBaseDataAccessor.remove(dataPath, AccessOption.PERSISTENT)) {
- // Clean-up is not critical so upon failure, we log instead of throwing an exception
- LOG.warn("Failed to clean up previous bucketed data in data path: {}", dataPath);
- }
+ List<String> paths = new ArrayList<>();
+ List<byte[]> buckets = new ArrayList<>();
+
+ int ptr = 0;
+ int counter = 0;
+ while (counter < numBuckets) {
+ paths.add(versionedDataPath + "/" + counter);
+ if (counter == numBuckets - 1) {
+ // Special treatment for the last bucket
+ buckets.add(
+ Arrays.copyOfRange(compressedRecord, ptr, ptr + compressedRecord.length % _bucketSize));
+ } else {
+ buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + _bucketSize));
+ }
+ ptr += _bucketSize;
+ counter++;
+ }
- // Do an async set to ZK
- boolean[] success =
- _zkBaseDataAccessor.setChildren(paths, buckets, AccessOption.PERSISTENT);
- // Exception and fail the write if any failed
- for (boolean s : success) {
- if (!s) {
- throw new HelixException(
- String.format("Failed to write the data buckets for path: %s", path));
- }
- }
+ // 3. Include the metadata in the batch write
+ Map<String, String> metadata = ImmutableMap.of(BUCKET_SIZE_KEY, Integer.toString(_bucketSize),
+ DATA_SIZE_KEY, Integer.toString(compressedRecord.length));
+ byte[] binaryMetadata = OBJECT_MAPPER.writeValueAsBytes(metadata);
+ paths.add(versionedDataPath + "/" + METADATA_KEY);
+ buckets.add(binaryMetadata);
+
+ // Do an async set to ZK
+ boolean[] success = _zkBaseDataAccessor.setChildren(paths, buckets, AccessOption.PERSISTENT);
+ // Exception and fail the write if any failed
+ for (boolean s : success) {
+ if (!s) {
+ throw new HelixException(
+ String.format("Failed to write the data buckets for path: %s", rootPath));
+ }
+ }
- // Data write completed, so update the metadata with last success index
- // Note that the metadata ZNodes is written using sync write
- metadataRecord.setIntField(BUCKET_SIZE_KEY, _bucketSize);
- metadataRecord.setLongField(DATA_SIZE_KEY, compressedRecord.length);
- metadataRecord.setIntField(LAST_SUCCESS_KEY, lastSuccessIndex);
- if (!_znRecordBaseDataAccessor.set(path, metadataRecord, AccessOption.PERSISTENT)) {
- throw new HelixException(
- String.format("Failed to write the metadata at path: %s!", path));
- }
- } finally {
- // Critical section for write ends here
- unlock(path);
+ // 4. Update lastSuccessfulWriteVersion using Updater
+ DataUpdater<byte[]> lastSuccessfulWriteVersionUpdater = dataInZk -> {
+ if (dataInZk == null || dataInZk.length == 0) {
+ // No last write version exists, so write version from this write
+ return versionStr.getBytes();
}
- return true;
+ // Last successful write exists so check if it's smaller than my number
+ String lastWriteVersionStr = new String(dataInZk);
+ long lastWriteVersion = Long.parseLong(lastWriteVersionStr);
+ if (lastWriteVersion < version) {
+ // Smaller, so I can overwrite
+ return versionStr.getBytes();
+ } else {
+ // Greater, I have lagged behind. Return null and do not write
+ return null;
+ }
+ };
+ if (!_zkBaseDataAccessor.update(rootPath + "/" + LAST_SUCCESSFUL_WRITE_KEY,
+ lastSuccessfulWriteVersionUpdater, AccessOption.PERSISTENT)) {
+ throw new HelixException(String
+ .format("Failed to write the last successful write metadata at path: %s!", rootPath));
}
- throw new HelixException(String.format("Could not acquire lock for write. Path: %s", path));
+
+ // 5. Update the timer for GC
+ updateGCTimer(rootPath, versionStr);
+ return true;
}
@Override
@@ -202,126 +223,158 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
if (!_zkClient.isClosed()) {
_zkClient.close();
}
- if (!_znRecordClient.isClosed()) {
- _znRecordClient.close();
- }
}
private HelixProperty compressedBucketRead(String path) {
- // TODO: Incorporate parallelism into reads instead of locking the whole thing against other
- // reads and writes
- if (tryLock(path)) {
- try {
- // Retrieve the metadata
- ZNRecord metadataRecord =
- _znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
- if (metadataRecord == null) {
- throw new ZkNoNodeException(
- String.format("Metadata ZNRecord does not exist for path: %s", path));
- }
+ // 1. Get the version to read
+ byte[] binaryVersionToRead = _zkBaseDataAccessor.get(path + "/" + LAST_SUCCESSFUL_WRITE_KEY,
+ null, AccessOption.PERSISTENT);
+ if (binaryVersionToRead == null) {
+ throw new ZkNoNodeException(
+ String.format("Last successful write ZNode does not exist for path: %s", path));
+ }
+ String versionToRead = new String(binaryVersionToRead);
+
+ // 2. Get the metadata map
+ byte[] binaryMetadata = _zkBaseDataAccessor.get(path + "/" + versionToRead + "/" + METADATA_KEY,
+ null, AccessOption.PERSISTENT);
+ if (binaryMetadata == null) {
+ throw new ZkNoNodeException(
+ String.format("Metadata ZNode does not exist for path: %s", path));
+ }
+ Map metadata;
+ try {
+ metadata = OBJECT_MAPPER.readValue(binaryMetadata, Map.class);
+ } catch (IOException e) {
+ throw new HelixException(String.format("Failed to deserialize path metadata: %s!", path), e);
+ }
- int bucketSize = metadataRecord.getIntField(BUCKET_SIZE_KEY, -1);
- int dataSize = metadataRecord.getIntField(DATA_SIZE_KEY, -1);
- int lastSuccessIndex = metadataRecord.getIntField(LAST_SUCCESS_KEY, -1);
- if (lastSuccessIndex == -1) {
- throw new HelixException(String.format("Metadata ZNRecord does not have %s! Path: %s",
- LAST_SUCCESS_KEY, path));
- }
- if (bucketSize == -1) {
- throw new HelixException(
- String.format("Metadata ZNRecord does not have %s! Path: %s", BUCKET_SIZE_KEY, path));
- }
- if (dataSize == -1) {
- throw new HelixException(
- String.format("Metadata ZNRecord does not have %s! Path: %s", DATA_SIZE_KEY, path));
- }
+ // 3. Read the data
+ Object bucketSizeObj = metadata.get(BUCKET_SIZE_KEY);
+ Object dataSizeObj = metadata.get(DATA_SIZE_KEY);
+ if (bucketSizeObj == null) {
+ throw new HelixException(
+ String.format("Metadata ZNRecord does not have %s! Path: %s", BUCKET_SIZE_KEY, path));
+ }
+ if (dataSizeObj == null) {
+ throw new HelixException(
+ String.format("Metadata ZNRecord does not have %s! Path: %s", DATA_SIZE_KEY, path));
+ }
+ int bucketSize = Integer.parseInt((String) bucketSizeObj);
+ int dataSize = Integer.parseInt((String) dataSizeObj);
- // Compute N - number of buckets
- int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
- byte[] compressedRecord = new byte[dataSize];
- String dataPath = path + "/" + lastSuccessIndex;
+ // Compute N - number of buckets
+ int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
+ byte[] compressedRecord = new byte[dataSize];
+ String dataPath = path + "/" + versionToRead;
- List<String> paths = new ArrayList<>();
- for (int i = 0; i < numBuckets; i++) {
- paths.add(dataPath + "/" + i);
- }
+ List<String> paths = new ArrayList<>();
+ for (int i = 0; i < numBuckets; i++) {
+ paths.add(dataPath + "/" + i);
+ }
- // Async get
- List buckets = _zkBaseDataAccessor.get(paths, null, AccessOption.PERSISTENT, true);
-
- // Combine buckets into one byte array
- int copyPtr = 0;
- for (int i = 0; i < numBuckets; i++) {
- if (i == numBuckets - 1) {
- // Special treatment for the last bucket
- System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, dataSize % bucketSize);
- } else {
- System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, bucketSize);
- copyPtr += bucketSize;
- }
- }
+ // Async get
+ List<byte[]> buckets = _zkBaseDataAccessor.get(paths, null, AccessOption.PERSISTENT, true);
+
+ // Combine buckets into one byte array
+ int copyPtr = 0;
+ for (int i = 0; i < numBuckets; i++) {
+ if (i == numBuckets - 1) {
+ // Special treatment for the last bucket
+ System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, dataSize % bucketSize);
+ } else {
+ System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, bucketSize);
+ copyPtr += bucketSize;
+ }
+ }
- // Decompress the byte array
- ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressedRecord);
- byte[] serializedRecord;
- try {
- serializedRecord = GZipCompressionUtil.uncompress(byteArrayInputStream);
- } catch (IOException e) {
- throw new HelixException(String.format("Failed to decompress path: %s!", path), e);
- }
+ // Decompress the byte array
+ ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressedRecord);
+ byte[] serializedRecord;
+ try {
+ serializedRecord = GZipCompressionUtil.uncompress(byteArrayInputStream);
+ } catch (IOException e) {
+ throw new HelixException(String.format("Failed to decompress path: %s!", path), e);
+ }
+
+ // Deserialize the record to retrieve the original
+ ZNRecord originalRecord = (ZNRecord) _zkSerializer.deserialize(serializedRecord);
+ return new HelixProperty(originalRecord);
+ }
+
+ @Override
+ public void close() {
+ disconnect();
+ }
- // Deserialize the record to retrieve the original
- ZNRecord originalRecord = (ZNRecord) _zkSerializer.deserialize(serializedRecord);
- return new HelixProperty(originalRecord);
- } finally {
- // Critical section for read ends here
- unlock(path);
+ private void updateGCTimer(String rootPath, String currentVersion) {
+ TimerTask gcTask = new TimerTask() {
+ @Override
+ public void run() {
+ deleteStaleVersions(rootPath, currentVersion);
}
- }
- throw new HelixException(String.format("Could not acquire lock for read. Path: %s", path));
+ };
+
+ // Schedule the gc task with TTL
+ GC_THREAD.schedule(gcTask, _versionTTL, TimeUnit.MILLISECONDS);
}
/**
- * Returns the last string element in a split String array by /.
- * @param path
- * @return
+ * Deletes all stale versions.
+ * @param rootPath
+ * @param currentVersion
*/
- private String extractIdFromPath(String path) {
- String[] splitPath = path.split("/");
- return splitPath[splitPath.length - 1];
+ private void deleteStaleVersions(String rootPath, String currentVersion) {
+ // Get all children names under path
+ List<String> children = _zkBaseDataAccessor.getChildNames(rootPath, AccessOption.PERSISTENT);
+ if (children == null || children.isEmpty()) {
+ // The whole path has been deleted so return immediately
+ return;
+ }
+ filterChildrenNames(children, currentVersion);
+ List<String> pathsToDelete = getPathsToDelete(rootPath, children);
+ for (String pathToDelete : pathsToDelete) {
+ // TODO: Should be batch delete but it doesn't work. It's okay since this runs async
+ _zkBaseDataAccessor.remove(pathToDelete, AccessOption.PERSISTENT);
+ }
}
/**
- * Acquires the lock (create an ephemeral node) only if it is free (no ephemeral node already
- * exists) at the time of invocation.
- * @param path
- * @return
+ * Filter out non-version children names and non-stale versions.
+ * @param children
*/
- private boolean tryLock(String path) {
- // Check if another write is taking place and if not, create an ephemeral node to simulate
- // acquiring of a lock
- return !_zkBaseDataAccessor.exists(path + "/" + WRITE_LOCK_KEY, AccessOption.EPHEMERAL)
- && _zkBaseDataAccessor.set(path + "/" + WRITE_LOCK_KEY, new byte[0],
- AccessOption.EPHEMERAL);
+ private void filterChildrenNames(List<String> children, String currentVersion) {
+ // Leave out metadata
+ children.remove(LAST_SUCCESSFUL_WRITE_KEY);
+ children.remove(LAST_WRITE_KEY);
+
+ // Leave out currentVersion and above
+ // This is because we want to honor the TTL for newer versions
+ children.remove(currentVersion);
+ long currentVer = Long.parseLong(currentVersion);
+ for (String child : children) {
+ try {
+ long version = Long.parseLong(child);
+ if (version >= currentVer) {
+ children.remove(child);
+ }
+ } catch (Exception e) {
+ // Ignore ZNode names that aren't parseable
+ children.remove(child);
+ LOG.debug("Found an invalid ZNode: {}", child);
+ }
+ }
}
/**
- * Releases the lock (removes the ephemeral node).
+ * Generates all stale paths to delete.
* @param path
+ * @param staleVersions
+ * @return
*/
- private void unlock(String path) {
- // Write succeeded, so release the lock
- if (!_zkBaseDataAccessor.remove(path + "/" + WRITE_LOCK_KEY, AccessOption.EPHEMERAL)) {
- throw new HelixException(String.format("Could not remove ephemeral node for path: %s", path));
- }
- // TODO: In case of remove failure, we risk a lock never getting released.
- // TODO: Consider two possible improvements
- // TODO: 1. Use ephemeral owner id for the same connection to reclaim the lock
- // TODO: 2. Use "lease" - lock with a timeout
- }
-
- @Override
- public void close() throws Exception {
- disconnect();
+ private List<String> getPathsToDelete(String path, List<String> staleVersions) {
+ List<String> pathsToDelete = new ArrayList<>();
+ staleVersions.forEach(ver -> pathsToDelete.add(path + "/" + ver));
+ return pathsToDelete;
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java
index 4c28835..c7b5cbf 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java
@@ -27,35 +27,64 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
import org.apache.helix.BucketDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestZkBucketDataAccessor extends ZkTestBase {
-
private static final String PATH = "/" + TestHelper.getTestClassName();
private static final String NAME_KEY = TestHelper.getTestClassName();
- private static final String LAST_SUCCESS_KEY = "LAST_SUCCESS";
- private static final String BUCKET_SIZE_KEY = "BUCKET_SIZE";
- private static final String WRITE_LOCK_KEY = "WRITE_LOCK";
+ private static final String LAST_SUCCESSFUL_WRITE_KEY = "LAST_SUCCESSFUL_WRITE";
+ private static final String LAST_WRITE_KEY = "LAST_WRITE";
// Populate list and map fields for content comparison
private static final List<String> LIST_FIELD = ImmutableList.of("1", "2");
private static final Map<String, String> MAP_FIELD = ImmutableMap.of("1", "2");
private BucketDataAccessor _bucketDataAccessor;
+ private BaseDataAccessor<byte[]> _zkBaseDataAccessor;
+
+ private ZNRecord record = new ZNRecord(NAME_KEY);
@BeforeClass
public void beforeClass() {
- _bucketDataAccessor = new ZkBucketDataAccessor(ZK_ADDR);
+ // Initialize ZK accessors for testing
+ _bucketDataAccessor = new ZkBucketDataAccessor(ZK_ADDR, 50 * 1024, 0L);
+ HelixZkClient zkClient = DedicatedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
+ zkClient.setZkSerializer(new ZkSerializer() {
+ @Override
+ public byte[] serialize(Object data) throws ZkMarshallingError {
+ if (data instanceof byte[]) {
+ return (byte[]) data;
+ }
+ throw new HelixException("ZkBucketDataAccesor only supports a byte array as an argument!");
+ }
+
+ @Override
+ public Object deserialize(byte[] data) throws ZkMarshallingError {
+ return data;
+ }
+ });
+ _zkBaseDataAccessor = new ZkBaseDataAccessor<>(zkClient);
+
+ // Fill in some data for the record
+ record.setSimpleField(NAME_KEY, NAME_KEY);
+ record.setListField(NAME_KEY, LIST_FIELD);
+ record.setMapField(NAME_KEY, MAP_FIELD);
}
@AfterClass
@@ -69,17 +98,42 @@ public class TestZkBucketDataAccessor extends ZkTestBase {
*/
@Test
public void testCompressedBucketWrite() throws IOException {
- ZNRecord record = new ZNRecord(NAME_KEY);
- record.setSimpleField(NAME_KEY, NAME_KEY);
- record.setListField(NAME_KEY, LIST_FIELD);
- record.setMapField(NAME_KEY, MAP_FIELD);
Assert.assertTrue(_bucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record)));
}
+ @Test(dependsOnMethods = "testCompressedBucketWrite")
+ public void testMultipleWrites() throws Exception {
+ int count = 50;
+
+ // Write "count" times
+ for (int i = 0; i < count; i++) {
+ _bucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record));
+ }
+
+ // Last known good version number should be "count"
+ byte[] binarySuccessfulWriteVer = _zkBaseDataAccessor
+ .get(PATH + "/" + LAST_SUCCESSFUL_WRITE_KEY, null, AccessOption.PERSISTENT);
+ long lastSuccessfulWriteVer = Long.parseLong(new String(binarySuccessfulWriteVer));
+ Assert.assertEquals(lastSuccessfulWriteVer, count);
+
+ // Last write version should be "count"
+ byte[] binaryWriteVer =
+ _zkBaseDataAccessor.get(PATH + "/" + LAST_WRITE_KEY, null, AccessOption.PERSISTENT);
+ long writeVer = Long.parseLong(new String(binaryWriteVer));
+ Assert.assertEquals(writeVer, count);
+
+ // Test that all previous versions have been deleted
+ // Use Verifier because GC can take ZK delay
+ Assert.assertTrue(TestHelper.verify(() -> {
+ List<String> children = _zkBaseDataAccessor.getChildNames(PATH, AccessOption.PERSISTENT);
+ return children.size() == 3;
+ }, 60 * 1000L));
+ }
+
/**
* The record written in {@link #testCompressedBucketWrite()} is the same record that was written.
*/
- @Test(dependsOnMethods = "testCompressedBucketWrite")
+ @Test(dependsOnMethods = "testMultipleWrites")
public void testCompressedBucketRead() {
HelixProperty readRecord = _bucketDataAccessor.compressedBucketRead(PATH, HelixProperty.class);
Assert.assertEquals(readRecord.getRecord().getSimpleField(NAME_KEY), NAME_KEY);
@@ -89,42 +143,9 @@ public class TestZkBucketDataAccessor extends ZkTestBase {
}
/**
- * Do 10 writes and check that there are 5 versions of the data.
- */
- @Test(dependsOnMethods = "testCompressedBucketRead")
- public void testManyWritesWithVersionCounts() throws IOException {
- int bucketSize = 50 * 1024;
- int numVersions = 5;
- int expectedLastSuccessfulIndex = 4;
- String path = PATH + "2";
- ZNRecord record = new ZNRecord(NAME_KEY);
- record.setSimpleField(NAME_KEY, NAME_KEY);
- record.setListField(NAME_KEY, LIST_FIELD);
- record.setMapField(NAME_KEY, MAP_FIELD);
-
- BucketDataAccessor bucketDataAccessor =
- new ZkBucketDataAccessor(ZK_ADDR, bucketSize, numVersions);
- for (int i = 0; i < 10; i++) {
- bucketDataAccessor.compressedBucketWrite(path, new HelixProperty(record));
- }
-
- // Check that there are numVersions number of children under path
- List<String> children = _baseAccessor.getChildNames(path, AccessOption.PERSISTENT);
- Assert.assertEquals(children.size(), numVersions);
-
- // Check that last successful index is 4 (since we did 10 writes)
- ZNRecord metadata = _baseAccessor.get(path, null, AccessOption.PERSISTENT);
- Assert.assertEquals(metadata.getIntField(LAST_SUCCESS_KEY, -1), expectedLastSuccessfulIndex);
-
- // Clean up
- bucketDataAccessor.compressedBucketDelete(path);
- bucketDataAccessor.disconnect();
- }
-
- /**
* Write a HelixProperty with large number of entries using BucketDataAccessor and read it back.
*/
- @Test(dependsOnMethods = "testManyWritesWithVersionCounts")
+ @Test(dependsOnMethods = "testCompressedBucketRead")
public void testLargeWriteAndRead() throws IOException {
String name = "largeResourceAssignment";
HelixProperty property = createLargeHelixProperty(name, 100000);
@@ -146,71 +167,6 @@ public class TestZkBucketDataAccessor extends ZkTestBase {
Assert.assertEquals(readRecord, property);
}
- /**
- * Tests that each write cleans up previous bucketed data. This method writes some small amount of
- * data and checks that the data buckets from the large write performed in the previous test
- * method have been cleaned up.
- * @throws IOException
- */
- @Test(dependsOnMethods = "testLargeWriteAndRead")
- public void testCleanupBeforeWrite() throws IOException {
- // Create a HelixProperty of a very small size with the same name as the large HelixProperty
- // created from the previous method
- String name = "largeResourceAssignment";
- HelixProperty property = new HelixProperty(name);
- property.getRecord().setIntField("Hi", 10);
-
- // Get the bucket count from the write performed in the previous method
- ZNRecord metadata = _baseAccessor.get("/" + name, null, AccessOption.PERSISTENT);
- int origBucketSize = metadata.getIntField(BUCKET_SIZE_KEY, -1);
-
- // Perform a write twice to overwrite both versions
- _bucketDataAccessor.compressedBucketWrite("/" + name, property);
- _bucketDataAccessor.compressedBucketWrite("/" + name, property);
-
- // Check that the children count for version 0 (version for the large write) is 1
- Assert.assertEquals(
- _baseAccessor.getChildNames("/" + name + "/0", AccessOption.PERSISTENT).size(), 1);
-
- // Clean up
- _bucketDataAccessor.compressedBucketDelete("/" + name);
- }
-
- /**
- * Test that no concurrent reads and writes are allowed by triggering multiple operations after
- * creating an artificial lock.
- * @throws IOException
- */
- @Test(dependsOnMethods = "testCleanupBeforeWrite")
- public void testFailureToAcquireLock() throws Exception {
- String name = "acquireLock";
- // Use a large HelixProperty to simulate a write that keeps the lock for some time
- HelixProperty property = createLargeHelixProperty(name, 100);
-
- // Artificially create the ephemeral ZNode
- _baseAccessor.create("/" + name + "/" + WRITE_LOCK_KEY, new ZNRecord(name),
- AccessOption.EPHEMERAL);
-
- // Test write
- try {
- _bucketDataAccessor.compressedBucketWrite("/" + name, property);
- Assert.fail("Should fail due to an already-existing lock ZNode!");
- } catch (HelixException e) {
- // Expect an exception
- }
-
- // Test read
- try {
- _bucketDataAccessor.compressedBucketRead("/" + name, HelixProperty.class);
- Assert.fail("Should fail due to an already-existing lock ZNode!");
- } catch (HelixException e) {
- // Expect an exception
- }
-
- // Clean up
- _bucketDataAccessor.compressedBucketDelete("/" + name);
- }
-
private HelixProperty createLargeHelixProperty(String name, int numEntries) {
HelixProperty property = new HelixProperty(name);
for (int i = 0; i < numEntries; i++) {