You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/08/13 19:43:50 UTC

[helix] branch master updated: Extend the task future tracking to a Map instead of a single object. (#1263)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 46e9227  Extend the task future tracking to a Map instead of a single object. (#1263)
46e9227 is described below

commit 46e9227a476ccf8c68735d85943fb2835f038e54
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Thu Aug 13 12:43:42 2020 -0700

    Extend the task future tracking to a Map instead of a single object. (#1263)
    
    Since the ZkBucketDataAccessor is used to access multiple paths, a single task future track is not enough. This change uses a map with the path as the key to track tasks for different paths.
    Also enhance the test case to cover the scenario of multiple paths being accessed in a short period.
---
 .../helix/manager/zk/ZkBucketDataAccessor.java     | 14 ++--
 .../helix/manager/zk/TestZkBucketDataAccessor.java | 74 ++++++++++++----------
 2 files changed, 51 insertions(+), 37 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
index 7d3b37b..51d96e9 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -23,6 +23,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
@@ -73,7 +74,7 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
   private ZkSerializer _zkSerializer;
   private RealmAwareZkClient _zkClient;
   private ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
-  private ScheduledFuture _gcTaskFuture = null;
+  private Map<String, ScheduledFuture> _gcTaskFutureMap = new HashMap<>();
 
   /**
    * Constructor that allows a custom bucket size.
@@ -239,6 +240,9 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
     if (!_zkBaseDataAccessor.remove(path, AccessOption.PERSISTENT)) {
       throw new HelixException(String.format("Failed to delete the bucket data! Path: %s", path));
     }
+    synchronized (this) {
+      _gcTaskFutureMap.remove(path);
+    }
   }
 
   @Override
@@ -337,17 +341,17 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
   }
 
   private synchronized void updateGCTimer(String rootPath, long currentVersion) {
-    if (_gcTaskFuture != null) {
-      _gcTaskFuture.cancel(false);
+    if (_gcTaskFutureMap.containsKey(rootPath)) {
+      _gcTaskFutureMap.remove(rootPath).cancel(false);
     }
     // Schedule the gc task with TTL
-    _gcTaskFuture = GC_THREAD.schedule(() -> {
+    _gcTaskFutureMap.put(rootPath, GC_THREAD.schedule(() -> {
       try {
         deleteStaleVersions(rootPath, currentVersion);
       } catch (Exception ex) {
         LOG.error("Failed to delete the stale versions.", ex);
       }
-    }, _versionTTLms, TimeUnit.MILLISECONDS);
+    }, _versionTTLms, TimeUnit.MILLISECONDS));
   }
 
   /**
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java
index 904204e..cef5d6b 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java
@@ -109,48 +109,58 @@ public class TestZkBucketDataAccessor extends ZkTestBase {
     // Otherwise the nodes named with version number will be ordered in a different alphabet order.
     // This might hide some bugs in the GC code。
     int count = 5;
+    int pathCount = 2;
 
     Assert.assertTrue(VERSION_TTL_MS > 100,
         "This test should be executed with the TTL more than 100ms.");
-    // Write "count" times
-    for (int i = 0; i < count; i++) {
-      _bucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record));
-    }
 
-    // Last known good version number should be "count"
-    byte[] binarySuccessfulWriteVer = _zkBaseDataAccessor
-        .get(PATH + "/" + LAST_SUCCESSFUL_WRITE_KEY, null, AccessOption.PERSISTENT);
-    long lastSuccessfulWriteVer = Long.parseLong(new String(binarySuccessfulWriteVer));
-    Assert.assertEquals(lastSuccessfulWriteVer, count);
-
-    // Last write version should be "count"
-    byte[] binaryWriteVer =
-        _zkBaseDataAccessor.get(PATH + "/" + LAST_WRITE_KEY, null, AccessOption.PERSISTENT);
-    long writeVer = Long.parseLong(new String(binaryWriteVer));
-    Assert.assertEquals(writeVer, count);
-
-    // Test that all previous versions have been deleted
-    // Use Verifier because GC can take ZK delay
-    Assert.assertTrue(TestHelper.verify(() -> {
-      List<String> children = _zkBaseDataAccessor.getChildNames(PATH, AccessOption.PERSISTENT);
-      return children.size() == 3 && children.containsAll(ImmutableList
-          .of(LAST_SUCCESSFUL_WRITE_KEY, LAST_WRITE_KEY,
-              new Long(lastSuccessfulWriteVer).toString()));
-    }, VERSION_TTL_MS * 2));
-
-    // Wait one more TTL to ensure that the GC has been done.
-    Thread.sleep(VERSION_TTL_MS);
-    List<String> children = _zkBaseDataAccessor.getChildNames(PATH, AccessOption.PERSISTENT);
-    Assert.assertTrue(children.size() == 3 && children.containsAll(ImmutableList
-        .of(LAST_SUCCESSFUL_WRITE_KEY, LAST_WRITE_KEY,
-            new Long(lastSuccessfulWriteVer).toString())));
+    try {
+      // Write "count + 1" times, so the latest version will be "count"
+      for (int i = 0; i < count + 1; i++) {
+        for (int j = 0; j < pathCount; j++) {
+          _bucketDataAccessor.compressedBucketWrite(PATH + j, new HelixProperty(record));
+        }
+      }
+
+      for (int j = 0; j < pathCount; j++) {
+        String path = PATH + j;
+        // Last known good version number should be "count"
+        byte[] binarySuccessfulWriteVer = _zkBaseDataAccessor.get(path + "/" + LAST_SUCCESSFUL_WRITE_KEY, null, AccessOption.PERSISTENT);
+        long lastSuccessfulWriteVer = Long.parseLong(new String(binarySuccessfulWriteVer));
+        Assert.assertEquals(lastSuccessfulWriteVer, count);
+
+        // Last write version should be "count"
+        byte[] binaryWriteVer = _zkBaseDataAccessor.get(path + "/" + LAST_WRITE_KEY, null, AccessOption.PERSISTENT);
+        long writeVer = Long.parseLong(new String(binaryWriteVer));
+        Assert.assertEquals(writeVer, count);
+
+        // Test that all previous versions have been deleted
+        // Use Verifier because GC can take ZK delay
+        Assert.assertTrue(TestHelper.verify(() -> {
+          List<String> children = _zkBaseDataAccessor.getChildNames(path, AccessOption.PERSISTENT);
+          return children.size() == 3 && children.containsAll(ImmutableList
+              .of(LAST_SUCCESSFUL_WRITE_KEY, LAST_WRITE_KEY, new Long(lastSuccessfulWriteVer).toString()));
+        }, VERSION_TTL_MS * 2));
+
+        // Wait one more TTL to ensure that the GC has been done.
+        Thread.sleep(VERSION_TTL_MS);
+        List<String> children = _zkBaseDataAccessor.getChildNames(path, AccessOption.PERSISTENT);
+        Assert.assertTrue(children.size() == 3 && children.containsAll(ImmutableList
+            .of(LAST_SUCCESSFUL_WRITE_KEY, LAST_WRITE_KEY, new Long(lastSuccessfulWriteVer).toString())));
+      }
+    } finally {
+      for (int j = 0; j < pathCount; j++) {
+        _bucketDataAccessor.compressedBucketDelete(PATH + j);
+      }
+    }
   }
 
   /**
    * The record written in {@link #testCompressedBucketWrite()} is the same record that was written.
    */
   @Test(dependsOnMethods = "testMultipleWrites")
-  public void testCompressedBucketRead() {
+  public void testCompressedBucketRead() throws IOException {
+    _bucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record));
     HelixProperty readRecord = _bucketDataAccessor.compressedBucketRead(PATH, HelixProperty.class);
     Assert.assertEquals(readRecord.getRecord().getSimpleField(NAME_KEY), NAME_KEY);
     Assert.assertEquals(readRecord.getRecord().getListField(NAME_KEY), LIST_FIELD);