You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/28 22:32:54 UTC

[helix] 16/50: Add BucketDataAccessor for large writes

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git

commit fdf20c9fa77bd472a33a2effd53a414ea90e842f
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Fri Sep 6 16:34:53 2019 -0700

    Add BucketDataAccessor for large writes
    
    For the new WAGED rebalancer, it's necessary to have a data accessor that will allow writes of data exceeding 1MB. ZooKeeper's ZNode size is capped at 1MB, so BucketDataAccessor interface and ZkBucketDataAccessor help us achieve this.
    Changelist:
    1. Add BucketDataAccessor and ZkBucketDataAccessor
    2. Add necessary serializers
    3. Add an integration test against ZK
---
 .../java/org/apache/helix/BucketDataAccessor.java  |  53 ++++
 .../manager/zk/ZNRecordJacksonSerializer.java      |  67 +++++
 .../helix/manager/zk/ZkBucketDataAccessor.java     | 326 +++++++++++++++++++++
 .../helix/manager/zk/TestZkBucketDataAccessor.java | 233 +++++++++++++++
 4 files changed, 679 insertions(+)

diff --git a/helix-core/src/main/java/org/apache/helix/BucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/BucketDataAccessor.java
new file mode 100644
index 0000000..2008c23
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/BucketDataAccessor.java
@@ -0,0 +1,53 @@
+package org.apache.helix;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+
+public interface BucketDataAccessor {
+
+  /**
+   * Write a HelixProperty in buckets, compressed.
+   * @param path path to which the metadata will be written to
+   * @param value HelixProperty to write
+   * @param <T>
+   * @throws IOException
+   */
+  <T extends HelixProperty> boolean compressedBucketWrite(String path, T value) throws IOException;
+
+  /**
+   * Read a HelixProperty that was written in buckets, compressed.
+   * @param path
+   * @param helixPropertySubType the subtype of HelixProperty the data was written in
+   * @param <T>
+   */
+  <T extends HelixProperty> HelixProperty compressedBucketRead(String path,
+      Class<T> helixPropertySubType);
+
+  /**
+   * Delete the HelixProperty in the given path.
+   * @param path
+   */
+  void compressedBucketDelete(String path);
+
+  /**
+   * Close the connection to the metadata store.
+   */
+  void disconnect();
+}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java
new file mode 100644
index 0000000..989017a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java
@@ -0,0 +1,67 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * ZNRecordJacksonSerializer serializes ZNRecord objects into a byte array using MessagePack's
+ * serializer. Note that this serializer doesn't check for the size of the resulting binary.
+ */
+public class ZNRecordJacksonSerializer implements ZkSerializer {
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  @Override
+  public byte[] serialize(Object record) throws ZkMarshallingError {
+    if (!(record instanceof ZNRecord)) {
+      // null is NOT an instance of any class
+      throw new HelixException("Input object is not of type ZNRecord (was " + record + ")");
+    }
+    ZNRecord znRecord = (ZNRecord) record;
+
+    try {
+      return OBJECT_MAPPER.writeValueAsBytes(znRecord);
+    } catch (IOException e) {
+      throw new HelixException(
+          String.format("Exception during serialization. ZNRecord id: %s", znRecord.getId()), e);
+    }
+  }
+
+  @Override
+  public Object deserialize(byte[] bytes) throws ZkMarshallingError {
+    if (bytes == null || bytes.length == 0) {
+      // reading a parent/null node
+      return null;
+    }
+
+    ZNRecord record;
+    try {
+      record = OBJECT_MAPPER.readValue(bytes, ZNRecord.class);
+    } catch (IOException e) {
+      throw new HelixException("Exception during deserialization!", e);
+    }
+    return record;
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
new file mode 100644
index 0000000..24c7c8e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -0,0 +1,326 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.BucketDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
+import org.apache.helix.util.GZipCompressionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
+  private static Logger LOG = LoggerFactory.getLogger(ZkBucketDataAccessor.class);
+
+  private static final int DEFAULT_NUM_VERSIONS = 2;
+  private static final String BUCKET_SIZE_KEY = "BUCKET_SIZE";
+  private static final String DATA_SIZE_KEY = "DATA_SIZE";
+  private static final String WRITE_LOCK_KEY = "WRITE_LOCK";
+  private static final String LAST_SUCCESS_KEY = "LAST_SUCCESS";
+
+  // 100 KB for default bucket size
+  private static final int DEFAULT_BUCKET_SIZE = 50 * 1024;
+  private final int _bucketSize;
+  private final int _numVersions;
+  private ZkSerializer _zkSerializer;
+  private HelixZkClient _zkClient;
+  private HelixZkClient _znRecordClient;
+  private BaseDataAccessor _zkBaseDataAccessor;
+  private BaseDataAccessor<ZNRecord> _znRecordBaseDataAccessor;
+
+  /**
+   * Constructor that allows a custom bucket size.
+   * @param zkAddr
+   * @param bucketSize
+   * @param numVersions number of versions to store in ZK
+   */
+  public ZkBucketDataAccessor(String zkAddr, int bucketSize, int numVersions) {
+    // There are two HelixZkClients:
+    // 1. _zkBaseDataAccessor for writes of binary data
+    // 2. _znRecordBaseDataAccessor for writes of ZNRecord (metadata)
+    _zkClient = DedicatedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
+    _zkClient.setZkSerializer(new ZkSerializer() {
+      @Override
+      public byte[] serialize(Object data) throws ZkMarshallingError {
+        if (data instanceof byte[]) {
+          return (byte[]) data;
+        }
+        throw new HelixException("ZkBucketDataAccesor only supports a byte array as an argument!");
+      }
+
+      @Override
+      public Object deserialize(byte[] data) throws ZkMarshallingError {
+        return data;
+      }
+    });
+    _zkBaseDataAccessor = new ZkBaseDataAccessor(_zkClient);
+
+    // TODO: Optimize serialization with Jackson
+    // TODO: Or use a better binary serialization protocol
+    // TODO: Consider making this also binary
+    // TODO: Consider an async write for the metadata as well
+    _znRecordClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
+    _znRecordBaseDataAccessor = new ZkBaseDataAccessor<>(_znRecordClient);
+    _znRecordClient.setZkSerializer(new ZNRecordSerializer());
+
+    _zkSerializer = new ZNRecordJacksonSerializer();
+    _bucketSize = bucketSize;
+    _numVersions = numVersions;
+  }
+
+  /**
+   * Constructor that uses a default bucket size.
+   * @param zkAddr
+   */
+  public ZkBucketDataAccessor(String zkAddr) {
+    this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_NUM_VERSIONS);
+  }
+
+  @Override
+  public <T extends HelixProperty> boolean compressedBucketWrite(String path, T value)
+      throws IOException {
+    // Take the ZNrecord and serialize it (get byte[])
+    byte[] serializedRecord = _zkSerializer.serialize(value.getRecord());
+    // Compress the byte[]
+    byte[] compressedRecord = GZipCompressionUtil.compress(serializedRecord);
+    // Compute N - number of buckets
+    int numBuckets = (compressedRecord.length + _bucketSize - 1) / _bucketSize;
+
+    if (tryLock(path)) {
+      try {
+        // Read or initialize metadata and compute the last success version index
+        ZNRecord metadataRecord =
+            _znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
+        if (metadataRecord == null) {
+          metadataRecord = new ZNRecord(extractIdFromPath(path));
+        }
+        int lastSuccessIndex =
+            (metadataRecord.getIntField(LAST_SUCCESS_KEY, -1) + 1) % _numVersions;
+        String dataPath = path + "/" + lastSuccessIndex;
+
+        List<String> paths = new ArrayList<>();
+        List<Object> buckets = new ArrayList<>();
+
+        int ptr = 0;
+        int counter = 0;
+        while (counter < numBuckets) {
+          paths.add(dataPath + "/" + counter);
+          if (counter == numBuckets - 1) {
+            // Special treatment for the last bucket
+            buckets.add(Arrays.copyOfRange(compressedRecord, ptr,
+                ptr + compressedRecord.length % _bucketSize));
+          } else {
+            buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + _bucketSize));
+          }
+          ptr += _bucketSize;
+          counter++;
+        }
+
+        // Do a cleanup of previous data
+        if (!_zkBaseDataAccessor.remove(dataPath, AccessOption.PERSISTENT)) {
+          // Clean-up is not critical so upon failure, we log instead of throwing an exception
+          LOG.warn("Failed to clean up previous bucketed data in data path: {}", dataPath);
+        }
+
+        // Do an async set to ZK
+        boolean[] success =
+            _zkBaseDataAccessor.setChildren(paths, buckets, AccessOption.PERSISTENT);
+        // Exception and fail the write if any failed
+        for (boolean s : success) {
+          if (!s) {
+            throw new HelixException(
+                String.format("Failed to write the data buckets for path: %s", path));
+          }
+        }
+
+        // Data write completed, so update the metadata with last success index
+        // Note that the metadata ZNodes is written using sync write
+        metadataRecord.setIntField(BUCKET_SIZE_KEY, _bucketSize);
+        metadataRecord.setLongField(DATA_SIZE_KEY, compressedRecord.length);
+        metadataRecord.setIntField(LAST_SUCCESS_KEY, lastSuccessIndex);
+        if (!_znRecordBaseDataAccessor.set(path, metadataRecord, AccessOption.PERSISTENT)) {
+          throw new HelixException(
+              String.format("Failed to write the metadata at path: %s!", path));
+        }
+      } finally {
+        // Critical section for write ends here
+        unlock(path);
+      }
+      return true;
+    }
+    throw new HelixException(String.format("Could not acquire lock for write. Path: %s", path));
+  }
+
+  @Override
+  public <T extends HelixProperty> HelixProperty compressedBucketRead(String path,
+      Class<T> helixPropertySubType) {
+    return helixPropertySubType.cast(compressedBucketRead(path));
+  }
+
+  @Override
+  public void compressedBucketDelete(String path) {
+    if (!_zkBaseDataAccessor.remove(path, AccessOption.PERSISTENT)) {
+      throw new HelixException(String.format("Failed to delete the bucket data! Path: %s", path));
+    }
+  }
+
+  @Override
+  public void disconnect() {
+    if (!_zkClient.isClosed()) {
+      _zkClient.close();
+    }
+    if (!_znRecordClient.isClosed()) {
+      _znRecordClient.close();
+    }
+  }
+
+  private HelixProperty compressedBucketRead(String path) {
+    // TODO: Incorporate parallelism into reads instead of locking the whole thing against other
+    // reads and writes
+    if (tryLock(path)) {
+      try {
+        // Retrieve the metadata
+        ZNRecord metadataRecord =
+            _znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
+        if (metadataRecord == null) {
+          throw new HelixException(
+              String.format("Metadata ZNRecord does not exist for path: %s", path));
+        }
+
+        int bucketSize = metadataRecord.getIntField(BUCKET_SIZE_KEY, -1);
+        int dataSize = metadataRecord.getIntField(DATA_SIZE_KEY, -1);
+        int lastSuccessIndex = metadataRecord.getIntField(LAST_SUCCESS_KEY, -1);
+        if (lastSuccessIndex == -1) {
+          throw new HelixException(String.format("Metadata ZNRecord does not have %s! Path: %s",
+              LAST_SUCCESS_KEY, path));
+        }
+        if (bucketSize == -1) {
+          throw new HelixException(
+              String.format("Metadata ZNRecord does not have %s! Path: %s", BUCKET_SIZE_KEY, path));
+        }
+        if (dataSize == -1) {
+          throw new HelixException(
+              String.format("Metadata ZNRecord does not have %s! Path: %s", DATA_SIZE_KEY, path));
+        }
+
+        // Compute N - number of buckets
+        int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
+        byte[] compressedRecord = new byte[dataSize];
+        String dataPath = path + "/" + lastSuccessIndex;
+
+        List<String> paths = new ArrayList<>();
+        for (int i = 0; i < numBuckets; i++) {
+          paths.add(dataPath + "/" + i);
+        }
+
+        // Async get
+        List buckets = _zkBaseDataAccessor.get(paths, null, AccessOption.PERSISTENT, true);
+
+        // Combine buckets into one byte array
+        int copyPtr = 0;
+        for (int i = 0; i < numBuckets; i++) {
+          if (i == numBuckets - 1) {
+            // Special treatment for the last bucket
+            System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, dataSize % bucketSize);
+          } else {
+            System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, bucketSize);
+            copyPtr += bucketSize;
+          }
+        }
+
+        // Decompress the byte array
+        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressedRecord);
+        byte[] serializedRecord;
+        try {
+          serializedRecord = GZipCompressionUtil.uncompress(byteArrayInputStream);
+        } catch (IOException e) {
+          throw new HelixException(String.format("Failed to decompress path: %s!", path), e);
+        }
+
+        // Deserialize the record to retrieve the original
+        ZNRecord originalRecord = (ZNRecord) _zkSerializer.deserialize(serializedRecord);
+        return new HelixProperty(originalRecord);
+      } finally {
+        // Critical section for read ends here
+        unlock(path);
+      }
+    }
+    throw new HelixException(String.format("Could not acquire lock for read. Path: %s", path));
+  }
+
+  /**
+   * Returns the last string element in a split String array by /.
+   * @param path
+   * @return
+   */
+  private String extractIdFromPath(String path) {
+    String[] splitPath = path.split("/");
+    return splitPath[splitPath.length - 1];
+  }
+
+  /**
+   * Acquires the lock (create an ephemeral node) only if it is free (no ephemeral node already
+   * exists) at the time of invocation.
+   * @param path
+   * @return
+   */
+  private boolean tryLock(String path) {
+    // Check if another write is taking place and if not, create an ephemeral node to simulate
+    // acquiring of a lock
+    return !_zkBaseDataAccessor.exists(path + "/" + WRITE_LOCK_KEY, AccessOption.EPHEMERAL)
+        && _zkBaseDataAccessor.set(path + "/" + WRITE_LOCK_KEY, new byte[0],
+            AccessOption.EPHEMERAL);
+  }
+
+  /**
+   * Releases the lock (removes the ephemeral node).
+   * @param path
+   */
+  private void unlock(String path) {
+    // Write succeeded, so release the lock
+    if (!_zkBaseDataAccessor.remove(path + "/" + WRITE_LOCK_KEY, AccessOption.EPHEMERAL)) {
+      throw new HelixException(String.format("Could not remove ephemeral node for path: %s", path));
+    }
+    // TODO: In case of remove failure, we risk a lock never getting released.
+    // TODO: Consider two possible improvements
+    // TODO: 1. Use ephemeral owner id for the same connection to reclaim the lock
+    // TODO: 2. Use "lease" - lock with a timeout
+  }
+
+  @Override
+  public void close() throws Exception {
+    disconnect();
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java
new file mode 100644
index 0000000..4c28835
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java
@@ -0,0 +1,233 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BucketDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.common.ZkTestBase;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestZkBucketDataAccessor extends ZkTestBase {
+
+  private static final String PATH = "/" + TestHelper.getTestClassName();
+  private static final String NAME_KEY = TestHelper.getTestClassName();
+  private static final String LAST_SUCCESS_KEY = "LAST_SUCCESS";
+  private static final String BUCKET_SIZE_KEY = "BUCKET_SIZE";
+  private static final String WRITE_LOCK_KEY = "WRITE_LOCK";
+
+  // Populate list and map fields for content comparison
+  private static final List<String> LIST_FIELD = ImmutableList.of("1", "2");
+  private static final Map<String, String> MAP_FIELD = ImmutableMap.of("1", "2");
+
+  private BucketDataAccessor _bucketDataAccessor;
+
+  @BeforeClass
+  public void beforeClass() {
+    _bucketDataAccessor = new ZkBucketDataAccessor(ZK_ADDR);
+  }
+
+  @AfterClass
+  public void afterClass() {
+    _bucketDataAccessor.disconnect();
+  }
+
+  /**
+   * Attempt writing a simple HelixProperty using compressedBucketWrite.
+   * @throws IOException
+   */
+  @Test
+  public void testCompressedBucketWrite() throws IOException {
+    ZNRecord record = new ZNRecord(NAME_KEY);
+    record.setSimpleField(NAME_KEY, NAME_KEY);
+    record.setListField(NAME_KEY, LIST_FIELD);
+    record.setMapField(NAME_KEY, MAP_FIELD);
+    Assert.assertTrue(_bucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record)));
+  }
+
+  /**
+   * The record written in {@link #testCompressedBucketWrite()} is the same record that was written.
+   */
+  @Test(dependsOnMethods = "testCompressedBucketWrite")
+  public void testCompressedBucketRead() {
+    HelixProperty readRecord = _bucketDataAccessor.compressedBucketRead(PATH, HelixProperty.class);
+    Assert.assertEquals(readRecord.getRecord().getSimpleField(NAME_KEY), NAME_KEY);
+    Assert.assertEquals(readRecord.getRecord().getListField(NAME_KEY), LIST_FIELD);
+    Assert.assertEquals(readRecord.getRecord().getMapField(NAME_KEY), MAP_FIELD);
+    _bucketDataAccessor.compressedBucketDelete(PATH);
+  }
+
+  /**
+   * Do 10 writes and check that there are 5 versions of the data.
+   */
+  @Test(dependsOnMethods = "testCompressedBucketRead")
+  public void testManyWritesWithVersionCounts() throws IOException {
+    int bucketSize = 50 * 1024;
+    int numVersions = 5;
+    int expectedLastSuccessfulIndex = 4;
+    String path = PATH + "2";
+    ZNRecord record = new ZNRecord(NAME_KEY);
+    record.setSimpleField(NAME_KEY, NAME_KEY);
+    record.setListField(NAME_KEY, LIST_FIELD);
+    record.setMapField(NAME_KEY, MAP_FIELD);
+
+    BucketDataAccessor bucketDataAccessor =
+        new ZkBucketDataAccessor(ZK_ADDR, bucketSize, numVersions);
+    for (int i = 0; i < 10; i++) {
+      bucketDataAccessor.compressedBucketWrite(path, new HelixProperty(record));
+    }
+
+    // Check that there are numVersions number of children under path
+    List<String> children = _baseAccessor.getChildNames(path, AccessOption.PERSISTENT);
+    Assert.assertEquals(children.size(), numVersions);
+
+    // Check that last successful index is 4 (since we did 10 writes)
+    ZNRecord metadata = _baseAccessor.get(path, null, AccessOption.PERSISTENT);
+    Assert.assertEquals(metadata.getIntField(LAST_SUCCESS_KEY, -1), expectedLastSuccessfulIndex);
+
+    // Clean up
+    bucketDataAccessor.compressedBucketDelete(path);
+    bucketDataAccessor.disconnect();
+  }
+
+  /**
+   * Write a HelixProperty with large number of entries using BucketDataAccessor and read it back.
+   */
+  @Test(dependsOnMethods = "testManyWritesWithVersionCounts")
+  public void testLargeWriteAndRead() throws IOException {
+    String name = "largeResourceAssignment";
+    HelixProperty property = createLargeHelixProperty(name, 100000);
+
+    // Perform large write
+    long before = System.currentTimeMillis();
+    _bucketDataAccessor.compressedBucketWrite("/" + name, property);
+    long after = System.currentTimeMillis();
+    System.out.println("Write took " + (after - before) + " ms");
+
+    // Read it back
+    before = System.currentTimeMillis();
+    HelixProperty readRecord =
+        _bucketDataAccessor.compressedBucketRead("/" + name, HelixProperty.class);
+    after = System.currentTimeMillis();
+    System.out.println("Read took " + (after - before) + " ms");
+
+    // Check against the original HelixProperty
+    Assert.assertEquals(readRecord, property);
+  }
+
+  /**
+   * Tests that each write cleans up previous bucketed data. This method writes some small amount of
+   * data and checks that the data buckets from the large write performed in the previous test
+   * method have been cleaned up.
+   * @throws IOException
+   */
+  @Test(dependsOnMethods = "testLargeWriteAndRead")
+  public void testCleanupBeforeWrite() throws IOException {
+    // Create a HelixProperty of a very small size with the same name as the large HelixProperty
+    // created from the previous method
+    String name = "largeResourceAssignment";
+    HelixProperty property = new HelixProperty(name);
+    property.getRecord().setIntField("Hi", 10);
+
+    // Get the bucket count from the write performed in the previous method
+    ZNRecord metadata = _baseAccessor.get("/" + name, null, AccessOption.PERSISTENT);
+    int origBucketSize = metadata.getIntField(BUCKET_SIZE_KEY, -1);
+
+    // Perform a write twice to overwrite both versions
+    _bucketDataAccessor.compressedBucketWrite("/" + name, property);
+    _bucketDataAccessor.compressedBucketWrite("/" + name, property);
+
+    // Check that the children count for version 0 (version for the large write) is 1
+    Assert.assertEquals(
+        _baseAccessor.getChildNames("/" + name + "/0", AccessOption.PERSISTENT).size(), 1);
+
+    // Clean up
+    _bucketDataAccessor.compressedBucketDelete("/" + name);
+  }
+
+  /**
+   * Test that no concurrent reads and writes are allowed by triggering multiple operations after
+   * creating an artificial lock.
+   * @throws IOException
+   */
+  @Test(dependsOnMethods = "testCleanupBeforeWrite")
+  public void testFailureToAcquireLock() throws Exception {
+    String name = "acquireLock";
+    // Use a large HelixProperty to simulate a write that keeps the lock for some time
+    HelixProperty property = createLargeHelixProperty(name, 100);
+
+    // Artificially create the ephemeral ZNode
+    _baseAccessor.create("/" + name + "/" + WRITE_LOCK_KEY, new ZNRecord(name),
+        AccessOption.EPHEMERAL);
+
+    // Test write
+    try {
+      _bucketDataAccessor.compressedBucketWrite("/" + name, property);
+      Assert.fail("Should fail due to an already-existing lock ZNode!");
+    } catch (HelixException e) {
+      // Expect an exception
+    }
+
+    // Test read
+    try {
+      _bucketDataAccessor.compressedBucketRead("/" + name, HelixProperty.class);
+      Assert.fail("Should fail due to an already-existing lock ZNode!");
+    } catch (HelixException e) {
+      // Expect an exception
+    }
+
+    // Clean up
+    _bucketDataAccessor.compressedBucketDelete("/" + name);
+  }
+
+  private HelixProperty createLargeHelixProperty(String name, int numEntries) {
+    HelixProperty property = new HelixProperty(name);
+    for (int i = 0; i < numEntries; i++) {
+      // Create a random string every time
+      byte[] arrayKey = new byte[20];
+      byte[] arrayVal = new byte[20];
+      new Random().nextBytes(arrayKey);
+      new Random().nextBytes(arrayVal);
+      String randomStrKey = new String(arrayKey, StandardCharsets.UTF_8);
+      String randomStrVal = new String(arrayVal, StandardCharsets.UTF_8);
+
+      // Dummy mapField
+      Map<String, String> mapField = new HashMap<>();
+      mapField.put(randomStrKey, randomStrVal);
+
+      property.getRecord().setMapField(randomStrKey, mapField);
+    }
+    return property;
+  }
+}