You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/28 22:33:18 UTC

[helix] 40/50: Increase parallelism for ZkBucketDataAccessor (#506)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 48d272cc84f62a995884a77ef6e9b101daf62256
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Fri Oct 18 18:41:29 2019 -0700

    Increase parallelism for ZkBucketDataAccessor (#506)
    
    * Increase parallelism for ZkBucketDataAccessor
    
    This diff improves parallelism and throughput for ZkBucketDataAccessor. It implements the following ideas:
    1. Optimistic Concurrency Control
    2. Monotonically Increasing Version Number
    3. Garbage Collection of Stale Metadata
    4. Retrying Reads Upon Failure
---
 .../helix/manager/zk/ZkBucketDataAccessor.java     | 431 ++++++++++++---------
 .../helix/manager/zk/TestZkBucketDataAccessor.java | 174 ++++-----
 2 files changed, 307 insertions(+), 298 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
index a11da29..bc13471 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -19,56 +19,60 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
+import com.google.common.collect.ImmutableMap;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.I0Itec.zkclient.DataUpdater;
 import org.I0Itec.zkclient.exception.ZkMarshallingError;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.AccessOption;
-import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.BucketDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
 import org.apache.helix.manager.zk.client.HelixZkClient;
-import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.util.GZipCompressionUtil;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
-  private static Logger LOG = LoggerFactory.getLogger(ZkBucketDataAccessor.class);
+  private static final Logger LOG = LoggerFactory.getLogger(ZkBucketDataAccessor.class);
 
-  private static final int DEFAULT_NUM_VERSIONS = 2;
+  private static final int DEFAULT_BUCKET_SIZE = 50 * 1024; // 50KB
+  private static final long DEFAULT_VERSION_TTL = TimeUnit.MINUTES.toMillis(1L); // 1 min
   private static final String BUCKET_SIZE_KEY = "BUCKET_SIZE";
   private static final String DATA_SIZE_KEY = "DATA_SIZE";
-  private static final String WRITE_LOCK_KEY = "WRITE_LOCK";
-  private static final String LAST_SUCCESS_KEY = "LAST_SUCCESS";
+  private static final String METADATA_KEY = "METADATA";
+  private static final String LAST_SUCCESSFUL_WRITE_KEY = "LAST_SUCCESSFUL_WRITE";
+  private static final String LAST_WRITE_KEY = "LAST_WRITE";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  // Thread pool for deleting stale versions
+  private static final ScheduledExecutorService GC_THREAD = Executors.newScheduledThreadPool(1);
 
-  // 100 KB for default bucket size
-  private static final int DEFAULT_BUCKET_SIZE = 50 * 1024;
   private final int _bucketSize;
-  private final int _numVersions;
+  private final long _versionTTL;
   private ZkSerializer _zkSerializer;
   private HelixZkClient _zkClient;
-  private HelixZkClient _znRecordClient;
-  private BaseDataAccessor _zkBaseDataAccessor;
-  private BaseDataAccessor<ZNRecord> _znRecordBaseDataAccessor;
+  private ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
 
   /**
    * Constructor that allows a custom bucket size.
    * @param zkAddr
    * @param bucketSize
-   * @param numVersions number of versions to store in ZK
+   * @param versionTTL in ms
    */
-  public ZkBucketDataAccessor(String zkAddr, int bucketSize, int numVersions) {
-    // There are two HelixZkClients:
-    // 1. _zkBaseDataAccessor for writes of binary data
-    // 2. _znRecordBaseDataAccessor for writes of ZNRecord (metadata)
+  public ZkBucketDataAccessor(String zkAddr, int bucketSize, long versionTTL) {
     _zkClient = DedicatedZkClientFactory.getInstance()
         .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
     _zkClient.setZkSerializer(new ZkSerializer() {
@@ -85,20 +89,10 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
         return data;
       }
     });
-    _zkBaseDataAccessor = new ZkBaseDataAccessor(_zkClient);
-
-    // TODO: Optimize serialization with Jackson
-    // TODO: Or use a better binary serialization protocol
-    // TODO: Consider making this also binary
-    // TODO: Consider an async write for the metadata as well
-    _znRecordClient = SharedZkClientFactory.getInstance()
-        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
-    _znRecordBaseDataAccessor = new ZkBaseDataAccessor<>(_znRecordClient);
-    _znRecordClient.setZkSerializer(new ZNRecordSerializer());
-
+    _zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
     _zkSerializer = new ZNRecordJacksonSerializer();
     _bucketSize = bucketSize;
-    _numVersions = numVersions;
+    _versionTTL = versionTTL;
   }
 
   /**
@@ -106,82 +100,109 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
    * @param zkAddr
    */
   public ZkBucketDataAccessor(String zkAddr) {
-    this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_NUM_VERSIONS);
+    this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL);
   }
 
   @Override
-  public <T extends HelixProperty> boolean compressedBucketWrite(String path, T value)
+  public <T extends HelixProperty> boolean compressedBucketWrite(String rootPath, T value)
       throws IOException {
-    // Take the ZNrecord and serialize it (get byte[])
+    DataUpdater<byte[]> lastWriteVersionUpdater = dataInZk -> {
+      if (dataInZk == null || dataInZk.length == 0) {
+        // No last write version exists, so start with 0
+        return "0".getBytes();
+      }
+      // Last write exists, so increment and write it back
+      // **String conversion is necessary to make it display in ZK (zooinspector)**
+      String lastWriteVersionStr = new String(dataInZk);
+      long lastWriteVersion = Long.parseLong(lastWriteVersionStr);
+      lastWriteVersion++;
+      return String.valueOf(lastWriteVersion).getBytes();
+    };
+
+    // 1. Increment lastWriteVersion using DataUpdater
+    ZkBaseDataAccessor.AccessResult result = _zkBaseDataAccessor.doUpdate(
+        rootPath + "/" + LAST_WRITE_KEY, lastWriteVersionUpdater, AccessOption.PERSISTENT);
+    if (result._retCode != ZkBaseDataAccessor.RetCode.OK) {
+      throw new HelixException(
+          String.format("Failed to write the write version at path: %s!", rootPath));
+    }
+
+    // Successfully reserved a version number
+    byte[] binaryVersion = (byte[]) result._updatedValue;
+    String versionStr = new String(binaryVersion);
+    final long version = Long.parseLong(versionStr);
+
+    // 2. Write to the incremented last write version
+    String versionedDataPath = rootPath + "/" + versionStr;
+
+    // Take the ZNRecord and serialize it (get byte[])
     byte[] serializedRecord = _zkSerializer.serialize(value.getRecord());
     // Compress the byte[]
     byte[] compressedRecord = GZipCompressionUtil.compress(serializedRecord);
     // Compute N - number of buckets
     int numBuckets = (compressedRecord.length + _bucketSize - 1) / _bucketSize;
 
-    if (tryLock(path)) {
-      try {
-        // Read or initialize metadata and compute the last success version index
-        ZNRecord metadataRecord =
-            _znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
-        if (metadataRecord == null) {
-          metadataRecord = new ZNRecord(extractIdFromPath(path));
-        }
-        int lastSuccessIndex =
-            (metadataRecord.getIntField(LAST_SUCCESS_KEY, -1) + 1) % _numVersions;
-        String dataPath = path + "/" + lastSuccessIndex;
-
-        List<String> paths = new ArrayList<>();
-        List<Object> buckets = new ArrayList<>();
-
-        int ptr = 0;
-        int counter = 0;
-        while (counter < numBuckets) {
-          paths.add(dataPath + "/" + counter);
-          if (counter == numBuckets - 1) {
-            // Special treatment for the last bucket
-            buckets.add(Arrays.copyOfRange(compressedRecord, ptr,
-                ptr + compressedRecord.length % _bucketSize));
-          } else {
-            buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + _bucketSize));
-          }
-          ptr += _bucketSize;
-          counter++;
-        }
-
-        // Do a cleanup of previous data
-        if (!_zkBaseDataAccessor.remove(dataPath, AccessOption.PERSISTENT)) {
-          // Clean-up is not critical so upon failure, we log instead of throwing an exception
-          LOG.warn("Failed to clean up previous bucketed data in data path: {}", dataPath);
-        }
+    List<String> paths = new ArrayList<>();
+    List<byte[]> buckets = new ArrayList<>();
+
+    int ptr = 0;
+    int counter = 0;
+    while (counter < numBuckets) {
+      paths.add(versionedDataPath + "/" + counter);
+      if (counter == numBuckets - 1) {
+        // Special treatment for the last bucket
+        buckets.add(
+            Arrays.copyOfRange(compressedRecord, ptr, ptr + compressedRecord.length % _bucketSize));
+      } else {
+        buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + _bucketSize));
+      }
+      ptr += _bucketSize;
+      counter++;
+    }
 
-        // Do an async set to ZK
-        boolean[] success =
-            _zkBaseDataAccessor.setChildren(paths, buckets, AccessOption.PERSISTENT);
-        // Exception and fail the write if any failed
-        for (boolean s : success) {
-          if (!s) {
-            throw new HelixException(
-                String.format("Failed to write the data buckets for path: %s", path));
-          }
-        }
+    // 3. Include the metadata in the batch write
+    Map<String, String> metadata = ImmutableMap.of(BUCKET_SIZE_KEY, Integer.toString(_bucketSize),
+        DATA_SIZE_KEY, Integer.toString(compressedRecord.length));
+    byte[] binaryMetadata = OBJECT_MAPPER.writeValueAsBytes(metadata);
+    paths.add(versionedDataPath + "/" + METADATA_KEY);
+    buckets.add(binaryMetadata);
+
+    // Do an async set to ZK
+    boolean[] success = _zkBaseDataAccessor.setChildren(paths, buckets, AccessOption.PERSISTENT);
+    // Exception and fail the write if any failed
+    for (boolean s : success) {
+      if (!s) {
+        throw new HelixException(
+            String.format("Failed to write the data buckets for path: %s", rootPath));
+      }
+    }
 
-        // Data write completed, so update the metadata with last success index
-        // Note that the metadata ZNodes is written using sync write
-        metadataRecord.setIntField(BUCKET_SIZE_KEY, _bucketSize);
-        metadataRecord.setLongField(DATA_SIZE_KEY, compressedRecord.length);
-        metadataRecord.setIntField(LAST_SUCCESS_KEY, lastSuccessIndex);
-        if (!_znRecordBaseDataAccessor.set(path, metadataRecord, AccessOption.PERSISTENT)) {
-          throw new HelixException(
-              String.format("Failed to write the metadata at path: %s!", path));
-        }
-      } finally {
-        // Critical section for write ends here
-        unlock(path);
+    // 4. Update lastSuccessfulWriteVersion using Updater
+    DataUpdater<byte[]> lastSuccessfulWriteVersionUpdater = dataInZk -> {
+      if (dataInZk == null || dataInZk.length == 0) {
+        // No last write version exists, so write version from this write
+        return versionStr.getBytes();
       }
-      return true;
+      // Last successful write exists so check if it's smaller than my number
+      String lastWriteVersionStr = new String(dataInZk);
+      long lastWriteVersion = Long.parseLong(lastWriteVersionStr);
+      if (lastWriteVersion < version) {
+        // Smaller, so I can overwrite
+        return versionStr.getBytes();
+      } else {
+        // Greater, I have lagged behind. Return null and do not write
+        return null;
+      }
+    };
+    if (!_zkBaseDataAccessor.update(rootPath + "/" + LAST_SUCCESSFUL_WRITE_KEY,
+        lastSuccessfulWriteVersionUpdater, AccessOption.PERSISTENT)) {
+      throw new HelixException(String
+          .format("Failed to write the last successful write metadata at path: %s!", rootPath));
     }
-    throw new HelixException(String.format("Could not acquire lock for write. Path: %s", path));
+
+    // 5. Update the timer for GC
+    updateGCTimer(rootPath, versionStr);
+    return true;
   }
 
   @Override
@@ -202,126 +223,158 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
     if (!_zkClient.isClosed()) {
       _zkClient.close();
     }
-    if (!_znRecordClient.isClosed()) {
-      _znRecordClient.close();
-    }
   }
 
   private HelixProperty compressedBucketRead(String path) {
-    // TODO: Incorporate parallelism into reads instead of locking the whole thing against other
-    // reads and writes
-    if (tryLock(path)) {
-      try {
-        // Retrieve the metadata
-        ZNRecord metadataRecord =
-            _znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
-        if (metadataRecord == null) {
-          throw new ZkNoNodeException(
-              String.format("Metadata ZNRecord does not exist for path: %s", path));
-        }
+    // 1. Get the version to read
+    byte[] binaryVersionToRead = _zkBaseDataAccessor.get(path + "/" + LAST_SUCCESSFUL_WRITE_KEY,
+        null, AccessOption.PERSISTENT);
+    if (binaryVersionToRead == null) {
+      throw new ZkNoNodeException(
+          String.format("Last successful write ZNode does not exist for path: %s", path));
+    }
+    String versionToRead = new String(binaryVersionToRead);
+
+    // 2. Get the metadata map
+    byte[] binaryMetadata = _zkBaseDataAccessor.get(path + "/" + versionToRead + "/" + METADATA_KEY,
+        null, AccessOption.PERSISTENT);
+    if (binaryMetadata == null) {
+      throw new ZkNoNodeException(
+          String.format("Metadata ZNode does not exist for path: %s", path));
+    }
+    Map metadata;
+    try {
+      metadata = OBJECT_MAPPER.readValue(binaryMetadata, Map.class);
+    } catch (IOException e) {
+      throw new HelixException(String.format("Failed to deserialize path metadata: %s!", path), e);
+    }
 
-        int bucketSize = metadataRecord.getIntField(BUCKET_SIZE_KEY, -1);
-        int dataSize = metadataRecord.getIntField(DATA_SIZE_KEY, -1);
-        int lastSuccessIndex = metadataRecord.getIntField(LAST_SUCCESS_KEY, -1);
-        if (lastSuccessIndex == -1) {
-          throw new HelixException(String.format("Metadata ZNRecord does not have %s! Path: %s",
-              LAST_SUCCESS_KEY, path));
-        }
-        if (bucketSize == -1) {
-          throw new HelixException(
-              String.format("Metadata ZNRecord does not have %s! Path: %s", BUCKET_SIZE_KEY, path));
-        }
-        if (dataSize == -1) {
-          throw new HelixException(
-              String.format("Metadata ZNRecord does not have %s! Path: %s", DATA_SIZE_KEY, path));
-        }
+    // 3. Read the data
+    Object bucketSizeObj = metadata.get(BUCKET_SIZE_KEY);
+    Object dataSizeObj = metadata.get(DATA_SIZE_KEY);
+    if (bucketSizeObj == null) {
+      throw new HelixException(
+          String.format("Metadata ZNRecord does not have %s! Path: %s", BUCKET_SIZE_KEY, path));
+    }
+    if (dataSizeObj == null) {
+      throw new HelixException(
+          String.format("Metadata ZNRecord does not have %s! Path: %s", DATA_SIZE_KEY, path));
+    }
+    int bucketSize = Integer.parseInt((String) bucketSizeObj);
+    int dataSize = Integer.parseInt((String) dataSizeObj);
 
-        // Compute N - number of buckets
-        int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
-        byte[] compressedRecord = new byte[dataSize];
-        String dataPath = path + "/" + lastSuccessIndex;
+    // Compute N - number of buckets
+    int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
+    byte[] compressedRecord = new byte[dataSize];
+    String dataPath = path + "/" + versionToRead;
 
-        List<String> paths = new ArrayList<>();
-        for (int i = 0; i < numBuckets; i++) {
-          paths.add(dataPath + "/" + i);
-        }
+    List<String> paths = new ArrayList<>();
+    for (int i = 0; i < numBuckets; i++) {
+      paths.add(dataPath + "/" + i);
+    }
 
-        // Async get
-        List buckets = _zkBaseDataAccessor.get(paths, null, AccessOption.PERSISTENT, true);
-
-        // Combine buckets into one byte array
-        int copyPtr = 0;
-        for (int i = 0; i < numBuckets; i++) {
-          if (i == numBuckets - 1) {
-            // Special treatment for the last bucket
-            System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, dataSize % bucketSize);
-          } else {
-            System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, bucketSize);
-            copyPtr += bucketSize;
-          }
-        }
+    // Async get
+    List<byte[]> buckets = _zkBaseDataAccessor.get(paths, null, AccessOption.PERSISTENT, true);
+
+    // Combine buckets into one byte array
+    int copyPtr = 0;
+    for (int i = 0; i < numBuckets; i++) {
+      if (i == numBuckets - 1) {
+        // Special treatment for the last bucket
+        System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, dataSize % bucketSize);
+      } else {
+        System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, bucketSize);
+        copyPtr += bucketSize;
+      }
+    }
 
-        // Decompress the byte array
-        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressedRecord);
-        byte[] serializedRecord;
-        try {
-          serializedRecord = GZipCompressionUtil.uncompress(byteArrayInputStream);
-        } catch (IOException e) {
-          throw new HelixException(String.format("Failed to decompress path: %s!", path), e);
-        }
+    // Decompress the byte array
+    ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressedRecord);
+    byte[] serializedRecord;
+    try {
+      serializedRecord = GZipCompressionUtil.uncompress(byteArrayInputStream);
+    } catch (IOException e) {
+      throw new HelixException(String.format("Failed to decompress path: %s!", path), e);
+    }
+
+    // Deserialize the record to retrieve the original
+    ZNRecord originalRecord = (ZNRecord) _zkSerializer.deserialize(serializedRecord);
+    return new HelixProperty(originalRecord);
+  }
+
+  @Override
+  public void close() {
+    disconnect();
+  }
 
-        // Deserialize the record to retrieve the original
-        ZNRecord originalRecord = (ZNRecord) _zkSerializer.deserialize(serializedRecord);
-        return new HelixProperty(originalRecord);
-      } finally {
-        // Critical section for read ends here
-        unlock(path);
+  private void updateGCTimer(String rootPath, String currentVersion) {
+    TimerTask gcTask = new TimerTask() {
+      @Override
+      public void run() {
+        deleteStaleVersions(rootPath, currentVersion);
       }
-    }
-    throw new HelixException(String.format("Could not acquire lock for read. Path: %s", path));
+    };
+
+    // Schedule the gc task with TTL
+    GC_THREAD.schedule(gcTask, _versionTTL, TimeUnit.MILLISECONDS);
   }
 
   /**
-   * Returns the last string element in a split String array by /.
-   * @param path
-   * @return
+   * Deletes all stale versions.
+   * @param rootPath
+   * @param currentVersion
    */
-  private String extractIdFromPath(String path) {
-    String[] splitPath = path.split("/");
-    return splitPath[splitPath.length - 1];
+  private void deleteStaleVersions(String rootPath, String currentVersion) {
+    // Get all children names under path
+    List<String> children = _zkBaseDataAccessor.getChildNames(rootPath, AccessOption.PERSISTENT);
+    if (children == null || children.isEmpty()) {
+      // The whole path has been deleted so return immediately
+      return;
+    }
+    filterChildrenNames(children, currentVersion);
+    List<String> pathsToDelete = getPathsToDelete(rootPath, children);
+    for (String pathToDelete : pathsToDelete) {
+      // TODO: Should be batch delete but it doesn't work. It's okay since this runs async
+      _zkBaseDataAccessor.remove(pathToDelete, AccessOption.PERSISTENT);
+    }
   }
 
   /**
-   * Acquires the lock (create an ephemeral node) only if it is free (no ephemeral node already
-   * exists) at the time of invocation.
-   * @param path
-   * @return
+   * Filter out non-version children names and non-stale versions.
+   * @param children
    */
-  private boolean tryLock(String path) {
-    // Check if another write is taking place and if not, create an ephemeral node to simulate
-    // acquiring of a lock
-    return !_zkBaseDataAccessor.exists(path + "/" + WRITE_LOCK_KEY, AccessOption.EPHEMERAL)
-        && _zkBaseDataAccessor.set(path + "/" + WRITE_LOCK_KEY, new byte[0],
-            AccessOption.EPHEMERAL);
+  private void filterChildrenNames(List<String> children, String currentVersion) {
+    // Leave out metadata
+    children.remove(LAST_SUCCESSFUL_WRITE_KEY);
+    children.remove(LAST_WRITE_KEY);
+
+    // Leave out currentVersion and above
+    // This is because we want to honor the TTL for newer versions
+    children.remove(currentVersion);
+    long currentVer = Long.parseLong(currentVersion);
+    for (String child : children) {
+      try {
+        long version = Long.parseLong(child);
+        if (version >= currentVer) {
+          children.remove(child);
+        }
+      } catch (Exception e) {
+        // Ignore ZNode names that aren't parseable
+        children.remove(child);
+        LOG.debug("Found an invalid ZNode: {}", child);
+      }
+    }
   }
 
   /**
-   * Releases the lock (removes the ephemeral node).
+   * Generates all stale paths to delete.
    * @param path
+   * @param staleVersions
+   * @return
    */
-  private void unlock(String path) {
-    // Write succeeded, so release the lock
-    if (!_zkBaseDataAccessor.remove(path + "/" + WRITE_LOCK_KEY, AccessOption.EPHEMERAL)) {
-      throw new HelixException(String.format("Could not remove ephemeral node for path: %s", path));
-    }
-    // TODO: In case of remove failure, we risk a lock never getting released.
-    // TODO: Consider two possible improvements
-    // TODO: 1. Use ephemeral owner id for the same connection to reclaim the lock
-    // TODO: 2. Use "lease" - lock with a timeout
-  }
-
-  @Override
-  public void close() throws Exception {
-    disconnect();
+  private List<String> getPathsToDelete(String path, List<String> staleVersions) {
+    List<String> pathsToDelete = new ArrayList<>();
+    staleVersions.forEach(ver -> pathsToDelete.add(path + "/" + ver));
+    return pathsToDelete;
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java
index 4c28835..c7b5cbf 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java
@@ -27,35 +27,64 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.BucketDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class TestZkBucketDataAccessor extends ZkTestBase {
-
   private static final String PATH = "/" + TestHelper.getTestClassName();
   private static final String NAME_KEY = TestHelper.getTestClassName();
-  private static final String LAST_SUCCESS_KEY = "LAST_SUCCESS";
-  private static final String BUCKET_SIZE_KEY = "BUCKET_SIZE";
-  private static final String WRITE_LOCK_KEY = "WRITE_LOCK";
+  private static final String LAST_SUCCESSFUL_WRITE_KEY = "LAST_SUCCESSFUL_WRITE";
+  private static final String LAST_WRITE_KEY = "LAST_WRITE";
 
   // Populate list and map fields for content comparison
   private static final List<String> LIST_FIELD = ImmutableList.of("1", "2");
   private static final Map<String, String> MAP_FIELD = ImmutableMap.of("1", "2");
 
   private BucketDataAccessor _bucketDataAccessor;
+  private BaseDataAccessor<byte[]> _zkBaseDataAccessor;
+
+  private ZNRecord record = new ZNRecord(NAME_KEY);
 
   @BeforeClass
   public void beforeClass() {
-    _bucketDataAccessor = new ZkBucketDataAccessor(ZK_ADDR);
+    // Initialize ZK accessors for testing
+    _bucketDataAccessor = new ZkBucketDataAccessor(ZK_ADDR, 50 * 1024, 0L);
+    HelixZkClient zkClient = DedicatedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
+    zkClient.setZkSerializer(new ZkSerializer() {
+      @Override
+      public byte[] serialize(Object data) throws ZkMarshallingError {
+        if (data instanceof byte[]) {
+          return (byte[]) data;
+        }
+        throw new HelixException("ZkBucketDataAccesor only supports a byte array as an argument!");
+      }
+
+      @Override
+      public Object deserialize(byte[] data) throws ZkMarshallingError {
+        return data;
+      }
+    });
+    _zkBaseDataAccessor = new ZkBaseDataAccessor<>(zkClient);
+
+    // Fill in some data for the record
+    record.setSimpleField(NAME_KEY, NAME_KEY);
+    record.setListField(NAME_KEY, LIST_FIELD);
+    record.setMapField(NAME_KEY, MAP_FIELD);
   }
 
   @AfterClass
@@ -69,17 +98,42 @@ public class TestZkBucketDataAccessor extends ZkTestBase {
    */
   @Test
   public void testCompressedBucketWrite() throws IOException {
-    ZNRecord record = new ZNRecord(NAME_KEY);
-    record.setSimpleField(NAME_KEY, NAME_KEY);
-    record.setListField(NAME_KEY, LIST_FIELD);
-    record.setMapField(NAME_KEY, MAP_FIELD);
     Assert.assertTrue(_bucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record)));
   }
 
+  @Test(dependsOnMethods = "testCompressedBucketWrite")
+  public void testMultipleWrites() throws Exception {
+    int count = 50;
+
+    // Write "count" times
+    for (int i = 0; i < count; i++) {
+      _bucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record));
+    }
+
+    // Last known good version number should be "count"
+    byte[] binarySuccessfulWriteVer = _zkBaseDataAccessor
+        .get(PATH + "/" + LAST_SUCCESSFUL_WRITE_KEY, null, AccessOption.PERSISTENT);
+    long lastSuccessfulWriteVer = Long.parseLong(new String(binarySuccessfulWriteVer));
+    Assert.assertEquals(lastSuccessfulWriteVer, count);
+
+    // Last write version should be "count"
+    byte[] binaryWriteVer =
+        _zkBaseDataAccessor.get(PATH + "/" + LAST_WRITE_KEY, null, AccessOption.PERSISTENT);
+    long writeVer = Long.parseLong(new String(binaryWriteVer));
+    Assert.assertEquals(writeVer, count);
+
+    // Test that all previous versions have been deleted
+    // Use Verifier because GC can take ZK delay
+    Assert.assertTrue(TestHelper.verify(() -> {
+      List<String> children = _zkBaseDataAccessor.getChildNames(PATH, AccessOption.PERSISTENT);
+      return children.size() == 3;
+    }, 60 * 1000L));
+  }
+
   /**
    * The record written in {@link #testCompressedBucketWrite()} is the same record that was written.
    */
-  @Test(dependsOnMethods = "testCompressedBucketWrite")
+  @Test(dependsOnMethods = "testMultipleWrites")
   public void testCompressedBucketRead() {
     HelixProperty readRecord = _bucketDataAccessor.compressedBucketRead(PATH, HelixProperty.class);
     Assert.assertEquals(readRecord.getRecord().getSimpleField(NAME_KEY), NAME_KEY);
@@ -89,42 +143,9 @@ public class TestZkBucketDataAccessor extends ZkTestBase {
   }
 
   /**
-   * Do 10 writes and check that there are 5 versions of the data.
-   */
-  @Test(dependsOnMethods = "testCompressedBucketRead")
-  public void testManyWritesWithVersionCounts() throws IOException {
-    int bucketSize = 50 * 1024;
-    int numVersions = 5;
-    int expectedLastSuccessfulIndex = 4;
-    String path = PATH + "2";
-    ZNRecord record = new ZNRecord(NAME_KEY);
-    record.setSimpleField(NAME_KEY, NAME_KEY);
-    record.setListField(NAME_KEY, LIST_FIELD);
-    record.setMapField(NAME_KEY, MAP_FIELD);
-
-    BucketDataAccessor bucketDataAccessor =
-        new ZkBucketDataAccessor(ZK_ADDR, bucketSize, numVersions);
-    for (int i = 0; i < 10; i++) {
-      bucketDataAccessor.compressedBucketWrite(path, new HelixProperty(record));
-    }
-
-    // Check that there are numVersions number of children under path
-    List<String> children = _baseAccessor.getChildNames(path, AccessOption.PERSISTENT);
-    Assert.assertEquals(children.size(), numVersions);
-
-    // Check that last successful index is 4 (since we did 10 writes)
-    ZNRecord metadata = _baseAccessor.get(path, null, AccessOption.PERSISTENT);
-    Assert.assertEquals(metadata.getIntField(LAST_SUCCESS_KEY, -1), expectedLastSuccessfulIndex);
-
-    // Clean up
-    bucketDataAccessor.compressedBucketDelete(path);
-    bucketDataAccessor.disconnect();
-  }
-
-  /**
    * Write a HelixProperty with large number of entries using BucketDataAccessor and read it back.
    */
-  @Test(dependsOnMethods = "testManyWritesWithVersionCounts")
+  @Test(dependsOnMethods = "testCompressedBucketRead")
   public void testLargeWriteAndRead() throws IOException {
     String name = "largeResourceAssignment";
     HelixProperty property = createLargeHelixProperty(name, 100000);
@@ -146,71 +167,6 @@ public class TestZkBucketDataAccessor extends ZkTestBase {
     Assert.assertEquals(readRecord, property);
   }
 
-  /**
-   * Tests that each write cleans up previous bucketed data. This method writes some small amount of
-   * data and checks that the data buckets from the large write performed in the previous test
-   * method have been cleaned up.
-   * @throws IOException
-   */
-  @Test(dependsOnMethods = "testLargeWriteAndRead")
-  public void testCleanupBeforeWrite() throws IOException {
-    // Create a HelixProperty of a very small size with the same name as the large HelixProperty
-    // created from the previous method
-    String name = "largeResourceAssignment";
-    HelixProperty property = new HelixProperty(name);
-    property.getRecord().setIntField("Hi", 10);
-
-    // Get the bucket count from the write performed in the previous method
-    ZNRecord metadata = _baseAccessor.get("/" + name, null, AccessOption.PERSISTENT);
-    int origBucketSize = metadata.getIntField(BUCKET_SIZE_KEY, -1);
-
-    // Perform a write twice to overwrite both versions
-    _bucketDataAccessor.compressedBucketWrite("/" + name, property);
-    _bucketDataAccessor.compressedBucketWrite("/" + name, property);
-
-    // Check that the children count for version 0 (version for the large write) is 1
-    Assert.assertEquals(
-        _baseAccessor.getChildNames("/" + name + "/0", AccessOption.PERSISTENT).size(), 1);
-
-    // Clean up
-    _bucketDataAccessor.compressedBucketDelete("/" + name);
-  }
-
-  /**
-   * Test that no concurrent reads and writes are allowed by triggering multiple operations after
-   * creating an artificial lock.
-   * @throws IOException
-   */
-  @Test(dependsOnMethods = "testCleanupBeforeWrite")
-  public void testFailureToAcquireLock() throws Exception {
-    String name = "acquireLock";
-    // Use a large HelixProperty to simulate a write that keeps the lock for some time
-    HelixProperty property = createLargeHelixProperty(name, 100);
-
-    // Artificially create the ephemeral ZNode
-    _baseAccessor.create("/" + name + "/" + WRITE_LOCK_KEY, new ZNRecord(name),
-        AccessOption.EPHEMERAL);
-
-    // Test write
-    try {
-      _bucketDataAccessor.compressedBucketWrite("/" + name, property);
-      Assert.fail("Should fail due to an already-existing lock ZNode!");
-    } catch (HelixException e) {
-      // Expect an exception
-    }
-
-    // Test read
-    try {
-      _bucketDataAccessor.compressedBucketRead("/" + name, HelixProperty.class);
-      Assert.fail("Should fail due to an already-existing lock ZNode!");
-    } catch (HelixException e) {
-      // Expect an exception
-    }
-
-    // Clean up
-    _bucketDataAccessor.compressedBucketDelete("/" + name);
-  }
-
   private HelixProperty createLargeHelixProperty(String name, int numEntries) {
     HelixProperty property = new HelixProperty(name);
     for (int i = 0; i < numEntries; i++) {