You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/28 22:32:54 UTC
[helix] 16/50: Add BucketDataAccessor for large writes
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git
commit fdf20c9fa77bd472a33a2effd53a414ea90e842f
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Fri Sep 6 16:34:53 2019 -0700
Add BucketDataAccessor for large writes
For the new WAGED rebalancer, it's necessary to have a data accessor that will allow writes of data exceeding 1MB. ZooKeeper's ZNode size is capped at 1MB, so BucketDataAccessor interface and ZkBucketDataAccessor help us achieve this.
Changelist:
1. Add BucketDataAccessor and ZkBucketDataAccessor
2. Add necessary serializers
3. Add an integration test against ZK
---
.../java/org/apache/helix/BucketDataAccessor.java | 53 ++++
.../manager/zk/ZNRecordJacksonSerializer.java | 67 +++++
.../helix/manager/zk/ZkBucketDataAccessor.java | 326 +++++++++++++++++++++
.../helix/manager/zk/TestZkBucketDataAccessor.java | 233 +++++++++++++++
4 files changed, 679 insertions(+)
diff --git a/helix-core/src/main/java/org/apache/helix/BucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/BucketDataAccessor.java
new file mode 100644
index 0000000..2008c23
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/BucketDataAccessor.java
@@ -0,0 +1,53 @@
+package org.apache.helix;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+
+public interface BucketDataAccessor {
+
+ /**
+ * Write a HelixProperty in buckets, compressed.
+ * @param path path to which the metadata will be written to
+ * @param value HelixProperty to write
+ * @param <T>
+ * @throws IOException
+ */
+ <T extends HelixProperty> boolean compressedBucketWrite(String path, T value) throws IOException;
+
+ /**
+ * Read a HelixProperty that was written in buckets, compressed.
+ * @param path
+ * @param helixPropertySubType the subtype of HelixProperty the data was written in
+ * @param <T>
+ */
+ <T extends HelixProperty> HelixProperty compressedBucketRead(String path,
+ Class<T> helixPropertySubType);
+
+ /**
+ * Delete the HelixProperty in the given path.
+ * @param path
+ */
+ void compressedBucketDelete(String path);
+
+ /**
+ * Close the connection to the metadata store.
+ */
+ void disconnect();
+}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java
new file mode 100644
index 0000000..989017a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java
@@ -0,0 +1,67 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * ZNRecordJacksonSerializer serializes ZNRecord objects into a byte array using MessagePack's
+ * serializer. Note that this serializer doesn't check for the size of the resulting binary.
+ */
+public class ZNRecordJacksonSerializer implements ZkSerializer {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ @Override
+ public byte[] serialize(Object record) throws ZkMarshallingError {
+ if (!(record instanceof ZNRecord)) {
+ // null is NOT an instance of any class
+ throw new HelixException("Input object is not of type ZNRecord (was " + record + ")");
+ }
+ ZNRecord znRecord = (ZNRecord) record;
+
+ try {
+ return OBJECT_MAPPER.writeValueAsBytes(znRecord);
+ } catch (IOException e) {
+ throw new HelixException(
+ String.format("Exception during serialization. ZNRecord id: %s", znRecord.getId()), e);
+ }
+ }
+
+ @Override
+ public Object deserialize(byte[] bytes) throws ZkMarshallingError {
+ if (bytes == null || bytes.length == 0) {
+ // reading a parent/null node
+ return null;
+ }
+
+ ZNRecord record;
+ try {
+ record = OBJECT_MAPPER.readValue(bytes, ZNRecord.class);
+ } catch (IOException e) {
+ throw new HelixException("Exception during deserialization!", e);
+ }
+ return record;
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
new file mode 100644
index 0000000..24c7c8e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -0,0 +1,326 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.BucketDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
+import org.apache.helix.util.GZipCompressionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
+ private static Logger LOG = LoggerFactory.getLogger(ZkBucketDataAccessor.class);
+
+ private static final int DEFAULT_NUM_VERSIONS = 2;
+ private static final String BUCKET_SIZE_KEY = "BUCKET_SIZE";
+ private static final String DATA_SIZE_KEY = "DATA_SIZE";
+ private static final String WRITE_LOCK_KEY = "WRITE_LOCK";
+ private static final String LAST_SUCCESS_KEY = "LAST_SUCCESS";
+
+ // 100 KB for default bucket size
+ private static final int DEFAULT_BUCKET_SIZE = 50 * 1024;
+ private final int _bucketSize;
+ private final int _numVersions;
+ private ZkSerializer _zkSerializer;
+ private HelixZkClient _zkClient;
+ private HelixZkClient _znRecordClient;
+ private BaseDataAccessor _zkBaseDataAccessor;
+ private BaseDataAccessor<ZNRecord> _znRecordBaseDataAccessor;
+
+ /**
+ * Constructor that allows a custom bucket size.
+ * @param zkAddr
+ * @param bucketSize
+ * @param numVersions number of versions to store in ZK
+ */
+ public ZkBucketDataAccessor(String zkAddr, int bucketSize, int numVersions) {
+ // There are two HelixZkClients:
+ // 1. _zkBaseDataAccessor for writes of binary data
+ // 2. _znRecordBaseDataAccessor for writes of ZNRecord (metadata)
+ _zkClient = DedicatedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
+ _zkClient.setZkSerializer(new ZkSerializer() {
+ @Override
+ public byte[] serialize(Object data) throws ZkMarshallingError {
+ if (data instanceof byte[]) {
+ return (byte[]) data;
+ }
+ throw new HelixException("ZkBucketDataAccesor only supports a byte array as an argument!");
+ }
+
+ @Override
+ public Object deserialize(byte[] data) throws ZkMarshallingError {
+ return data;
+ }
+ });
+ _zkBaseDataAccessor = new ZkBaseDataAccessor(_zkClient);
+
+ // TODO: Optimize serialization with Jackson
+ // TODO: Or use a better binary serialization protocol
+ // TODO: Consider making this also binary
+ // TODO: Consider an async write for the metadata as well
+ _znRecordClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
+ _znRecordBaseDataAccessor = new ZkBaseDataAccessor<>(_znRecordClient);
+ _znRecordClient.setZkSerializer(new ZNRecordSerializer());
+
+ _zkSerializer = new ZNRecordJacksonSerializer();
+ _bucketSize = bucketSize;
+ _numVersions = numVersions;
+ }
+
+ /**
+ * Constructor that uses a default bucket size.
+ * @param zkAddr
+ */
+ public ZkBucketDataAccessor(String zkAddr) {
+ this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_NUM_VERSIONS);
+ }
+
+ @Override
+ public <T extends HelixProperty> boolean compressedBucketWrite(String path, T value)
+ throws IOException {
+ // Take the ZNrecord and serialize it (get byte[])
+ byte[] serializedRecord = _zkSerializer.serialize(value.getRecord());
+ // Compress the byte[]
+ byte[] compressedRecord = GZipCompressionUtil.compress(serializedRecord);
+ // Compute N - number of buckets
+ int numBuckets = (compressedRecord.length + _bucketSize - 1) / _bucketSize;
+
+ if (tryLock(path)) {
+ try {
+ // Read or initialize metadata and compute the last success version index
+ ZNRecord metadataRecord =
+ _znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
+ if (metadataRecord == null) {
+ metadataRecord = new ZNRecord(extractIdFromPath(path));
+ }
+ int lastSuccessIndex =
+ (metadataRecord.getIntField(LAST_SUCCESS_KEY, -1) + 1) % _numVersions;
+ String dataPath = path + "/" + lastSuccessIndex;
+
+ List<String> paths = new ArrayList<>();
+ List<Object> buckets = new ArrayList<>();
+
+ int ptr = 0;
+ int counter = 0;
+ while (counter < numBuckets) {
+ paths.add(dataPath + "/" + counter);
+ if (counter == numBuckets - 1) {
+ // Special treatment for the last bucket
+ buckets.add(Arrays.copyOfRange(compressedRecord, ptr,
+ ptr + compressedRecord.length % _bucketSize));
+ } else {
+ buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + _bucketSize));
+ }
+ ptr += _bucketSize;
+ counter++;
+ }
+
+ // Do a cleanup of previous data
+ if (!_zkBaseDataAccessor.remove(dataPath, AccessOption.PERSISTENT)) {
+ // Clean-up is not critical so upon failure, we log instead of throwing an exception
+ LOG.warn("Failed to clean up previous bucketed data in data path: {}", dataPath);
+ }
+
+ // Do an async set to ZK
+ boolean[] success =
+ _zkBaseDataAccessor.setChildren(paths, buckets, AccessOption.PERSISTENT);
+ // Exception and fail the write if any failed
+ for (boolean s : success) {
+ if (!s) {
+ throw new HelixException(
+ String.format("Failed to write the data buckets for path: %s", path));
+ }
+ }
+
+ // Data write completed, so update the metadata with last success index
+ // Note that the metadata ZNodes is written using sync write
+ metadataRecord.setIntField(BUCKET_SIZE_KEY, _bucketSize);
+ metadataRecord.setLongField(DATA_SIZE_KEY, compressedRecord.length);
+ metadataRecord.setIntField(LAST_SUCCESS_KEY, lastSuccessIndex);
+ if (!_znRecordBaseDataAccessor.set(path, metadataRecord, AccessOption.PERSISTENT)) {
+ throw new HelixException(
+ String.format("Failed to write the metadata at path: %s!", path));
+ }
+ } finally {
+ // Critical section for write ends here
+ unlock(path);
+ }
+ return true;
+ }
+ throw new HelixException(String.format("Could not acquire lock for write. Path: %s", path));
+ }
+
+ @Override
+ public <T extends HelixProperty> HelixProperty compressedBucketRead(String path,
+ Class<T> helixPropertySubType) {
+ return helixPropertySubType.cast(compressedBucketRead(path));
+ }
+
+ @Override
+ public void compressedBucketDelete(String path) {
+ if (!_zkBaseDataAccessor.remove(path, AccessOption.PERSISTENT)) {
+ throw new HelixException(String.format("Failed to delete the bucket data! Path: %s", path));
+ }
+ }
+
+ @Override
+ public void disconnect() {
+ if (!_zkClient.isClosed()) {
+ _zkClient.close();
+ }
+ if (!_znRecordClient.isClosed()) {
+ _znRecordClient.close();
+ }
+ }
+
+ private HelixProperty compressedBucketRead(String path) {
+ // TODO: Incorporate parallelism into reads instead of locking the whole thing against other
+ // reads and writes
+ if (tryLock(path)) {
+ try {
+ // Retrieve the metadata
+ ZNRecord metadataRecord =
+ _znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
+ if (metadataRecord == null) {
+ throw new HelixException(
+ String.format("Metadata ZNRecord does not exist for path: %s", path));
+ }
+
+ int bucketSize = metadataRecord.getIntField(BUCKET_SIZE_KEY, -1);
+ int dataSize = metadataRecord.getIntField(DATA_SIZE_KEY, -1);
+ int lastSuccessIndex = metadataRecord.getIntField(LAST_SUCCESS_KEY, -1);
+ if (lastSuccessIndex == -1) {
+ throw new HelixException(String.format("Metadata ZNRecord does not have %s! Path: %s",
+ LAST_SUCCESS_KEY, path));
+ }
+ if (bucketSize == -1) {
+ throw new HelixException(
+ String.format("Metadata ZNRecord does not have %s! Path: %s", BUCKET_SIZE_KEY, path));
+ }
+ if (dataSize == -1) {
+ throw new HelixException(
+ String.format("Metadata ZNRecord does not have %s! Path: %s", DATA_SIZE_KEY, path));
+ }
+
+ // Compute N - number of buckets
+ int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
+ byte[] compressedRecord = new byte[dataSize];
+ String dataPath = path + "/" + lastSuccessIndex;
+
+ List<String> paths = new ArrayList<>();
+ for (int i = 0; i < numBuckets; i++) {
+ paths.add(dataPath + "/" + i);
+ }
+
+ // Async get
+ List buckets = _zkBaseDataAccessor.get(paths, null, AccessOption.PERSISTENT, true);
+
+ // Combine buckets into one byte array
+ int copyPtr = 0;
+ for (int i = 0; i < numBuckets; i++) {
+ if (i == numBuckets - 1) {
+ // Special treatment for the last bucket
+ System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, dataSize % bucketSize);
+ } else {
+ System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, bucketSize);
+ copyPtr += bucketSize;
+ }
+ }
+
+ // Decompress the byte array
+ ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressedRecord);
+ byte[] serializedRecord;
+ try {
+ serializedRecord = GZipCompressionUtil.uncompress(byteArrayInputStream);
+ } catch (IOException e) {
+ throw new HelixException(String.format("Failed to decompress path: %s!", path), e);
+ }
+
+ // Deserialize the record to retrieve the original
+ ZNRecord originalRecord = (ZNRecord) _zkSerializer.deserialize(serializedRecord);
+ return new HelixProperty(originalRecord);
+ } finally {
+ // Critical section for read ends here
+ unlock(path);
+ }
+ }
+ throw new HelixException(String.format("Could not acquire lock for read. Path: %s", path));
+ }
+
+ /**
+ * Returns the last string element in a split String array by /.
+ * @param path
+ * @return
+ */
+ private String extractIdFromPath(String path) {
+ String[] splitPath = path.split("/");
+ return splitPath[splitPath.length - 1];
+ }
+
+ /**
+ * Acquires the lock (create an ephemeral node) only if it is free (no ephemeral node already
+ * exists) at the time of invocation.
+ * @param path
+ * @return
+ */
+ private boolean tryLock(String path) {
+ // Check if another write is taking place and if not, create an ephemeral node to simulate
+ // acquiring of a lock
+ return !_zkBaseDataAccessor.exists(path + "/" + WRITE_LOCK_KEY, AccessOption.EPHEMERAL)
+ && _zkBaseDataAccessor.set(path + "/" + WRITE_LOCK_KEY, new byte[0],
+ AccessOption.EPHEMERAL);
+ }
+
+ /**
+ * Releases the lock (removes the ephemeral node).
+ * @param path
+ */
+ private void unlock(String path) {
+ // Write succeeded, so release the lock
+ if (!_zkBaseDataAccessor.remove(path + "/" + WRITE_LOCK_KEY, AccessOption.EPHEMERAL)) {
+ throw new HelixException(String.format("Could not remove ephemeral node for path: %s", path));
+ }
+ // TODO: In case of remove failure, we risk a lock never getting released.
+ // TODO: Consider two possible improvements
+ // TODO: 1. Use ephemeral owner id for the same connection to reclaim the lock
+ // TODO: 2. Use "lease" - lock with a timeout
+ }
+
+ @Override
+ public void close() throws Exception {
+ disconnect();
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java
new file mode 100644
index 0000000..4c28835
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java
@@ -0,0 +1,233 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BucketDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.common.ZkTestBase;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestZkBucketDataAccessor extends ZkTestBase {
+
+ private static final String PATH = "/" + TestHelper.getTestClassName();
+ private static final String NAME_KEY = TestHelper.getTestClassName();
+ private static final String LAST_SUCCESS_KEY = "LAST_SUCCESS";
+ private static final String BUCKET_SIZE_KEY = "BUCKET_SIZE";
+ private static final String WRITE_LOCK_KEY = "WRITE_LOCK";
+
+ // Populate list and map fields for content comparison
+ private static final List<String> LIST_FIELD = ImmutableList.of("1", "2");
+ private static final Map<String, String> MAP_FIELD = ImmutableMap.of("1", "2");
+
+ private BucketDataAccessor _bucketDataAccessor;
+
+ @BeforeClass
+ public void beforeClass() {
+ _bucketDataAccessor = new ZkBucketDataAccessor(ZK_ADDR);
+ }
+
+ @AfterClass
+ public void afterClass() {
+ _bucketDataAccessor.disconnect();
+ }
+
+ /**
+ * Attempt writing a simple HelixProperty using compressedBucketWrite.
+ * @throws IOException
+ */
+ @Test
+ public void testCompressedBucketWrite() throws IOException {
+ ZNRecord record = new ZNRecord(NAME_KEY);
+ record.setSimpleField(NAME_KEY, NAME_KEY);
+ record.setListField(NAME_KEY, LIST_FIELD);
+ record.setMapField(NAME_KEY, MAP_FIELD);
+ Assert.assertTrue(_bucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record)));
+ }
+
+ /**
+ * The record written in {@link #testCompressedBucketWrite()} is the same record that was written.
+ */
+ @Test(dependsOnMethods = "testCompressedBucketWrite")
+ public void testCompressedBucketRead() {
+ HelixProperty readRecord = _bucketDataAccessor.compressedBucketRead(PATH, HelixProperty.class);
+ Assert.assertEquals(readRecord.getRecord().getSimpleField(NAME_KEY), NAME_KEY);
+ Assert.assertEquals(readRecord.getRecord().getListField(NAME_KEY), LIST_FIELD);
+ Assert.assertEquals(readRecord.getRecord().getMapField(NAME_KEY), MAP_FIELD);
+ _bucketDataAccessor.compressedBucketDelete(PATH);
+ }
+
+ /**
+ * Do 10 writes and check that there are 5 versions of the data.
+ */
+ @Test(dependsOnMethods = "testCompressedBucketRead")
+ public void testManyWritesWithVersionCounts() throws IOException {
+ int bucketSize = 50 * 1024;
+ int numVersions = 5;
+ int expectedLastSuccessfulIndex = 4;
+ String path = PATH + "2";
+ ZNRecord record = new ZNRecord(NAME_KEY);
+ record.setSimpleField(NAME_KEY, NAME_KEY);
+ record.setListField(NAME_KEY, LIST_FIELD);
+ record.setMapField(NAME_KEY, MAP_FIELD);
+
+ BucketDataAccessor bucketDataAccessor =
+ new ZkBucketDataAccessor(ZK_ADDR, bucketSize, numVersions);
+ for (int i = 0; i < 10; i++) {
+ bucketDataAccessor.compressedBucketWrite(path, new HelixProperty(record));
+ }
+
+ // Check that there are numVersions number of children under path
+ List<String> children = _baseAccessor.getChildNames(path, AccessOption.PERSISTENT);
+ Assert.assertEquals(children.size(), numVersions);
+
+ // Check that last successful index is 4 (since we did 10 writes)
+ ZNRecord metadata = _baseAccessor.get(path, null, AccessOption.PERSISTENT);
+ Assert.assertEquals(metadata.getIntField(LAST_SUCCESS_KEY, -1), expectedLastSuccessfulIndex);
+
+ // Clean up
+ bucketDataAccessor.compressedBucketDelete(path);
+ bucketDataAccessor.disconnect();
+ }
+
+ /**
+ * Write a HelixProperty with large number of entries using BucketDataAccessor and read it back.
+ */
+ @Test(dependsOnMethods = "testManyWritesWithVersionCounts")
+ public void testLargeWriteAndRead() throws IOException {
+ String name = "largeResourceAssignment";
+ HelixProperty property = createLargeHelixProperty(name, 100000);
+
+ // Perform large write
+ long before = System.currentTimeMillis();
+ _bucketDataAccessor.compressedBucketWrite("/" + name, property);
+ long after = System.currentTimeMillis();
+ System.out.println("Write took " + (after - before) + " ms");
+
+ // Read it back
+ before = System.currentTimeMillis();
+ HelixProperty readRecord =
+ _bucketDataAccessor.compressedBucketRead("/" + name, HelixProperty.class);
+ after = System.currentTimeMillis();
+ System.out.println("Read took " + (after - before) + " ms");
+
+ // Check against the original HelixProperty
+ Assert.assertEquals(readRecord, property);
+ }
+
+ /**
+ * Tests that each write cleans up previous bucketed data. This method writes some small amount of
+ * data and checks that the data buckets from the large write performed in the previous test
+ * method have been cleaned up.
+ * @throws IOException
+ */
+ @Test(dependsOnMethods = "testLargeWriteAndRead")
+ public void testCleanupBeforeWrite() throws IOException {
+ // Create a HelixProperty of a very small size with the same name as the large HelixProperty
+ // created from the previous method
+ String name = "largeResourceAssignment";
+ HelixProperty property = new HelixProperty(name);
+ property.getRecord().setIntField("Hi", 10);
+
+ // Get the bucket count from the write performed in the previous method
+ ZNRecord metadata = _baseAccessor.get("/" + name, null, AccessOption.PERSISTENT);
+ int origBucketSize = metadata.getIntField(BUCKET_SIZE_KEY, -1);
+
+ // Perform a write twice to overwrite both versions
+ _bucketDataAccessor.compressedBucketWrite("/" + name, property);
+ _bucketDataAccessor.compressedBucketWrite("/" + name, property);
+
+ // Check that the children count for version 0 (version for the large write) is 1
+ Assert.assertEquals(
+ _baseAccessor.getChildNames("/" + name + "/0", AccessOption.PERSISTENT).size(), 1);
+
+ // Clean up
+ _bucketDataAccessor.compressedBucketDelete("/" + name);
+ }
+
+ /**
+ * Test that no concurrent reads and writes are allowed by triggering multiple operations after
+ * creating an artificial lock.
+ * @throws IOException
+ */
+ @Test(dependsOnMethods = "testCleanupBeforeWrite")
+ public void testFailureToAcquireLock() throws Exception {
+ String name = "acquireLock";
+ // Use a large HelixProperty to simulate a write that keeps the lock for some time
+ HelixProperty property = createLargeHelixProperty(name, 100);
+
+ // Artificially create the ephemeral ZNode
+ _baseAccessor.create("/" + name + "/" + WRITE_LOCK_KEY, new ZNRecord(name),
+ AccessOption.EPHEMERAL);
+
+ // Test write
+ try {
+ _bucketDataAccessor.compressedBucketWrite("/" + name, property);
+ Assert.fail("Should fail due to an already-existing lock ZNode!");
+ } catch (HelixException e) {
+ // Expect an exception
+ }
+
+ // Test read
+ try {
+ _bucketDataAccessor.compressedBucketRead("/" + name, HelixProperty.class);
+ Assert.fail("Should fail due to an already-existing lock ZNode!");
+ } catch (HelixException e) {
+ // Expect an exception
+ }
+
+ // Clean up
+ _bucketDataAccessor.compressedBucketDelete("/" + name);
+ }
+
+ private HelixProperty createLargeHelixProperty(String name, int numEntries) {
+ HelixProperty property = new HelixProperty(name);
+ for (int i = 0; i < numEntries; i++) {
+ // Create a random string every time
+ byte[] arrayKey = new byte[20];
+ byte[] arrayVal = new byte[20];
+ new Random().nextBytes(arrayKey);
+ new Random().nextBytes(arrayVal);
+ String randomStrKey = new String(arrayKey, StandardCharsets.UTF_8);
+ String randomStrVal = new String(arrayVal, StandardCharsets.UTF_8);
+
+ // Dummy mapField
+ Map<String, String> mapField = new HashMap<>();
+ mapField.put(randomStrKey, randomStrVal);
+
+ property.getRecord().setMapField(randomStrKey, mapField);
+ }
+ return property;
+ }
+}