You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/06/23 00:04:24 UTC

[helix] branch master updated: Fix ZkBucketDataAccessor failure due to concurrent modification. (#1107)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 2be97cd  Fix ZkBucketDataAccessor failure due to concurrent modification. (#1107)
2be97cd is described below

commit 2be97cd08a020ae4c722ce12dea34acfed71555c
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Mon Jun 22 17:04:16 2020 -0700

    Fix ZkBucketDataAccessor failure due to concurrent modification. (#1107)
    
    Concurrent modification causes two issues.
    1. Regular GC task fails due to concurrent list modification and the stale versions are not removed at all.
    2. If, by coincident, there is a newer version in the list other then the current version, then because of the modification of the list inside the loop, the final element (the newer version) won't be filtered but being left in the to-be-removed list. Then the GC task removes the most recent version. For example,
      a) Input, current version "2"
      b) Children = [1, 2, 3]
      c) The task avoids checking "2", so the list for loop is: [1, 3]
      d) When checking "1", it is removed from the list. So the list becomes [3]. Then the loop ends, because the first item has already been looped from the for iteration perspective.
      e) The version to be removed is "3"!
    
    This PR fixes the issue by avoiding concurrent modification. Also, it simplifies the logic so as to reduce the pending GC tasks.
    The test is also updated accordingly.
---
 .../helix/manager/zk/ZkBucketDataAccessor.java     | 76 ++++++++++++----------
 .../helix/manager/zk/TestZkBucketDataAccessor.java | 32 ++++++---
 2 files changed, 65 insertions(+), 43 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
index 2eda09b..7f2e748 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -25,9 +25,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.TimerTask;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.ImmutableMap;
@@ -69,18 +69,19 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
       Executors.newSingleThreadScheduledExecutor();
 
   private final int _bucketSize;
-  private final long _versionTTL;
+  private final long _versionTTLms;
   private ZkSerializer _zkSerializer;
   private RealmAwareZkClient _zkClient;
   private ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
+  private ScheduledFuture _gcTaskFuture = null;
 
   /**
    * Constructor that allows a custom bucket size.
    * @param zkAddr
    * @param bucketSize
-   * @param versionTTL in ms
+   * @param versionTTLms in ms
    */
-  public ZkBucketDataAccessor(String zkAddr, int bucketSize, long versionTTL) {
+  public ZkBucketDataAccessor(String zkAddr, int bucketSize, long versionTTLms) {
     if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
       try {
         // Create realm-aware ZkClient.
@@ -114,7 +115,7 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
     _zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
     _zkSerializer = new ZNRecordJacksonSerializer();
     _bucketSize = bucketSize;
-    _versionTTL = versionTTL;
+    _versionTTLms = versionTTLms;
   }
 
   /**
@@ -223,7 +224,7 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
     }
 
     // 5. Update the timer for GC
-    updateGCTimer(rootPath, versionStr);
+    updateGCTimer(rootPath, version);
     return true;
   }
 
@@ -329,16 +330,18 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
     disconnect();
   }
 
-  private void updateGCTimer(String rootPath, String currentVersion) {
-    TimerTask gcTask = new TimerTask() {
-      @Override
-      public void run() {
+  private synchronized void updateGCTimer(String rootPath, long currentVersion) {
+    if (_gcTaskFuture != null) {
+      _gcTaskFuture.cancel(false);
+    }
+    // Schedule the gc task with TTL
+    _gcTaskFuture = GC_THREAD.schedule(() -> {
+      try {
         deleteStaleVersions(rootPath, currentVersion);
+      } catch (Exception ex) {
+        LOG.error("Failed to delete the stale versions.", ex);
       }
-    };
-
-    // Schedule the gc task with TTL
-    GC_THREAD.schedule(gcTask, _versionTTL, TimeUnit.MILLISECONDS);
+    }, _versionTTLms, TimeUnit.MILLISECONDS);
   }
 
   /**
@@ -346,15 +349,15 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
    * @param rootPath
    * @param currentVersion
    */
-  private void deleteStaleVersions(String rootPath, String currentVersion) {
+  private void deleteStaleVersions(String rootPath, long currentVersion) {
     // Get all children names under path
     List<String> children = _zkBaseDataAccessor.getChildNames(rootPath, AccessOption.PERSISTENT);
     if (children == null || children.isEmpty()) {
       // The whole path has been deleted so return immediately
       return;
     }
-    filterChildrenNames(children, currentVersion);
-    List<String> pathsToDelete = getPathsToDelete(rootPath, children);
+    List<String> pathsToDelete =
+        getPathsToDelete(rootPath, filterChildrenNames(children, currentVersion));
     for (String pathToDelete : pathsToDelete) {
       // TODO: Should be batch delete but it doesn't work. It's okay since this runs async
       _zkBaseDataAccessor.remove(pathToDelete, AccessOption.PERSISTENT);
@@ -363,29 +366,32 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
 
   /**
    * Filter out non-version children names and non-stale versions.
-   * @param childrenToRemove
+   * @param childrenNodes
+   * @param currentVersion
+   * @return The filtered child node names to be removed
    */
-  private void filterChildrenNames(List<String> childrenToRemove, String currentVersion) {
-    // Leave out metadata
-    childrenToRemove.remove(LAST_SUCCESSFUL_WRITE_KEY);
-    childrenToRemove.remove(LAST_WRITE_KEY);
-
-    // Leave out currentVersion and above
-    // This is because we want to honor the TTL for newer versions
-    childrenToRemove.remove(currentVersion);
-    long currentVer = Long.parseLong(currentVersion);
-    for (String child : childrenToRemove) {
+  private List<String> filterChildrenNames(List<String> childrenNodes, long currentVersion) {
+    List<String> childrenToRemove = new ArrayList<>();
+    for (String child : childrenNodes) {
+      // Leave out metadata
+      if (child.equals(LAST_SUCCESSFUL_WRITE_KEY) || child.equals(LAST_WRITE_KEY)) {
+        continue;
+      }
+      long childVer;
       try {
-        long version = Long.parseLong(child);
-        if (version >= currentVer) {
-          childrenToRemove.remove(child);
-        }
-      } catch (Exception e) {
+        childVer = Long.parseLong(child);
+      } catch (NumberFormatException ex) {
+        LOG.warn("Found an invalid ZNode: {}", child);
         // Ignore ZNode names that aren't parseable
-        childrenToRemove.remove(child);
-        LOG.debug("Found an invalid ZNode: {}", child);
+        continue;
+      }
+      if (childVer < currentVersion) {
+        childrenToRemove.add(child);
       }
+      // Leave out currentVersion and above
+      // This is because we want to honor the TTL for newer versions
     }
+    return childrenToRemove;
   }
 
   /**
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java
index fdd0c8a..70bb949 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java
@@ -19,24 +19,25 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.BucketDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.TestHelper;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.common.ZkTestBase;
-import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
 import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
 import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 import org.testng.Assert;
@@ -49,6 +50,7 @@ public class TestZkBucketDataAccessor extends ZkTestBase {
   private static final String NAME_KEY = TestHelper.getTestClassName();
   private static final String LAST_SUCCESSFUL_WRITE_KEY = "LAST_SUCCESSFUL_WRITE";
   private static final String LAST_WRITE_KEY = "LAST_WRITE";
+  private static final long VERSION_TTL_MS = 1000L;
 
   // Populate list and map fields for content comparison
   private static final List<String> LIST_FIELD = ImmutableList.of("1", "2");
@@ -62,7 +64,7 @@ public class TestZkBucketDataAccessor extends ZkTestBase {
   @BeforeClass
   public void beforeClass() {
     // Initialize ZK accessors for testing
-    _bucketDataAccessor = new ZkBucketDataAccessor(ZK_ADDR, 50 * 1024, 0L);
+    _bucketDataAccessor = new ZkBucketDataAccessor(ZK_ADDR, 50 * 1024, VERSION_TTL_MS);
     HelixZkClient zkClient = DedicatedZkClientFactory.getInstance()
         .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
     zkClient.setZkSerializer(new ZkSerializer() {
@@ -103,8 +105,13 @@ public class TestZkBucketDataAccessor extends ZkTestBase {
 
   @Test(dependsOnMethods = "testCompressedBucketWrite")
   public void testMultipleWrites() throws Exception {
-    int count = 50;
+    // Note to use a count number < 10 for testing.
+    // Otherwise the nodes named with version number will be ordered in a different alphabet order.
+    // This might hide some bugs in the GC code。
+    int count = 5;
 
+    Assert.assertTrue(VERSION_TTL_MS > 100,
+        "This test should be executed with the TTL more than 100ms.");
     // Write "count" times
     for (int i = 0; i < count; i++) {
       _bucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record));
@@ -126,8 +133,17 @@ public class TestZkBucketDataAccessor extends ZkTestBase {
     // Use Verifier because GC can take ZK delay
     Assert.assertTrue(TestHelper.verify(() -> {
       List<String> children = _zkBaseDataAccessor.getChildNames(PATH, AccessOption.PERSISTENT);
-      return children.size() == 3;
-    }, 60 * 1000L));
+      return children.size() == 3 && children.containsAll(ImmutableList
+          .of(LAST_SUCCESSFUL_WRITE_KEY, LAST_WRITE_KEY,
+              new Long(lastSuccessfulWriteVer).toString()));
+    }, VERSION_TTL_MS * 2));
+
+    // Wait one more TTL to ensure that the GC has been done.
+    Thread.sleep(VERSION_TTL_MS);
+    List<String> children = _zkBaseDataAccessor.getChildNames(PATH, AccessOption.PERSISTENT);
+    Assert.assertTrue(children.size() == 3 && children.containsAll(ImmutableList
+        .of(LAST_SUCCESSFUL_WRITE_KEY, LAST_WRITE_KEY,
+            new Long(lastSuccessfulWriteVer).toString())));
   }
 
   /**