You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/08/13 19:43:50 UTC
[helix] branch master updated: Extend the task future tracking to a
Map instead of a single object. (#1263)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 46e9227 Extend the task future tracking to a Map instead of a single object. (#1263)
46e9227 is described below
commit 46e9227a476ccf8c68735d85943fb2835f038e54
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Thu Aug 13 12:43:42 2020 -0700
Extend the task future tracking to a Map instead of a single object. (#1263)
Since the ZkBucketDataAccessor is used to access multiple paths, a single task future track is not enough. This change uses a map with the path as the key to track tasks for different paths.
Also enhance the test case to cover the scenario of multiple paths being accessed in a short period.
---
.../helix/manager/zk/ZkBucketDataAccessor.java | 14 ++--
.../helix/manager/zk/TestZkBucketDataAccessor.java | 74 ++++++++++++----------
2 files changed, 51 insertions(+), 37 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
index 7d3b37b..51d96e9 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -23,6 +23,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
@@ -73,7 +74,7 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
private ZkSerializer _zkSerializer;
private RealmAwareZkClient _zkClient;
private ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
- private ScheduledFuture _gcTaskFuture = null;
+ private Map<String, ScheduledFuture> _gcTaskFutureMap = new HashMap<>();
/**
* Constructor that allows a custom bucket size.
@@ -239,6 +240,9 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
if (!_zkBaseDataAccessor.remove(path, AccessOption.PERSISTENT)) {
throw new HelixException(String.format("Failed to delete the bucket data! Path: %s", path));
}
+ synchronized (this) {
+ _gcTaskFutureMap.remove(path);
+ }
}
@Override
@@ -337,17 +341,17 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
}
private synchronized void updateGCTimer(String rootPath, long currentVersion) {
- if (_gcTaskFuture != null) {
- _gcTaskFuture.cancel(false);
+ if (_gcTaskFutureMap.containsKey(rootPath)) {
+ _gcTaskFutureMap.remove(rootPath).cancel(false);
}
// Schedule the gc task with TTL
- _gcTaskFuture = GC_THREAD.schedule(() -> {
+ _gcTaskFutureMap.put(rootPath, GC_THREAD.schedule(() -> {
try {
deleteStaleVersions(rootPath, currentVersion);
} catch (Exception ex) {
LOG.error("Failed to delete the stale versions.", ex);
}
- }, _versionTTLms, TimeUnit.MILLISECONDS);
+ }, _versionTTLms, TimeUnit.MILLISECONDS));
}
/**
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java
index 904204e..cef5d6b 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java
@@ -109,48 +109,58 @@ public class TestZkBucketDataAccessor extends ZkTestBase {
// Otherwise the nodes named with version number will be ordered in a different alphabet order.
// This might hide some bugs in the GC code。
int count = 5;
+ int pathCount = 2;
Assert.assertTrue(VERSION_TTL_MS > 100,
"This test should be executed with the TTL more than 100ms.");
- // Write "count" times
- for (int i = 0; i < count; i++) {
- _bucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record));
- }
- // Last known good version number should be "count"
- byte[] binarySuccessfulWriteVer = _zkBaseDataAccessor
- .get(PATH + "/" + LAST_SUCCESSFUL_WRITE_KEY, null, AccessOption.PERSISTENT);
- long lastSuccessfulWriteVer = Long.parseLong(new String(binarySuccessfulWriteVer));
- Assert.assertEquals(lastSuccessfulWriteVer, count);
-
- // Last write version should be "count"
- byte[] binaryWriteVer =
- _zkBaseDataAccessor.get(PATH + "/" + LAST_WRITE_KEY, null, AccessOption.PERSISTENT);
- long writeVer = Long.parseLong(new String(binaryWriteVer));
- Assert.assertEquals(writeVer, count);
-
- // Test that all previous versions have been deleted
- // Use Verifier because GC can take ZK delay
- Assert.assertTrue(TestHelper.verify(() -> {
- List<String> children = _zkBaseDataAccessor.getChildNames(PATH, AccessOption.PERSISTENT);
- return children.size() == 3 && children.containsAll(ImmutableList
- .of(LAST_SUCCESSFUL_WRITE_KEY, LAST_WRITE_KEY,
- new Long(lastSuccessfulWriteVer).toString()));
- }, VERSION_TTL_MS * 2));
-
- // Wait one more TTL to ensure that the GC has been done.
- Thread.sleep(VERSION_TTL_MS);
- List<String> children = _zkBaseDataAccessor.getChildNames(PATH, AccessOption.PERSISTENT);
- Assert.assertTrue(children.size() == 3 && children.containsAll(ImmutableList
- .of(LAST_SUCCESSFUL_WRITE_KEY, LAST_WRITE_KEY,
- new Long(lastSuccessfulWriteVer).toString())));
+ try {
+ // Write "count + 1" times, so the latest version will be "count"
+ for (int i = 0; i < count + 1; i++) {
+ for (int j = 0; j < pathCount; j++) {
+ _bucketDataAccessor.compressedBucketWrite(PATH + j, new HelixProperty(record));
+ }
+ }
+
+ for (int j = 0; j < pathCount; j++) {
+ String path = PATH + j;
+ // Last known good version number should be "count"
+ byte[] binarySuccessfulWriteVer = _zkBaseDataAccessor.get(path + "/" + LAST_SUCCESSFUL_WRITE_KEY, null, AccessOption.PERSISTENT);
+ long lastSuccessfulWriteVer = Long.parseLong(new String(binarySuccessfulWriteVer));
+ Assert.assertEquals(lastSuccessfulWriteVer, count);
+
+ // Last write version should be "count"
+ byte[] binaryWriteVer = _zkBaseDataAccessor.get(path + "/" + LAST_WRITE_KEY, null, AccessOption.PERSISTENT);
+ long writeVer = Long.parseLong(new String(binaryWriteVer));
+ Assert.assertEquals(writeVer, count);
+
+ // Test that all previous versions have been deleted
+ // Use Verifier because GC can take ZK delay
+ Assert.assertTrue(TestHelper.verify(() -> {
+ List<String> children = _zkBaseDataAccessor.getChildNames(path, AccessOption.PERSISTENT);
+ return children.size() == 3 && children.containsAll(ImmutableList
+ .of(LAST_SUCCESSFUL_WRITE_KEY, LAST_WRITE_KEY, new Long(lastSuccessfulWriteVer).toString()));
+ }, VERSION_TTL_MS * 2));
+
+ // Wait one more TTL to ensure that the GC has been done.
+ Thread.sleep(VERSION_TTL_MS);
+ List<String> children = _zkBaseDataAccessor.getChildNames(path, AccessOption.PERSISTENT);
+ Assert.assertTrue(children.size() == 3 && children.containsAll(ImmutableList
+ .of(LAST_SUCCESSFUL_WRITE_KEY, LAST_WRITE_KEY, new Long(lastSuccessfulWriteVer).toString())));
+ }
+ } finally {
+ for (int j = 0; j < pathCount; j++) {
+ _bucketDataAccessor.compressedBucketDelete(PATH + j);
+ }
+ }
}
/**
* The record written in {@link #testCompressedBucketWrite()} is the same record that was written.
*/
@Test(dependsOnMethods = "testMultipleWrites")
- public void testCompressedBucketRead() {
+ public void testCompressedBucketRead() throws IOException {
+ _bucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record));
HelixProperty readRecord = _bucketDataAccessor.compressedBucketRead(PATH, HelixProperty.class);
Assert.assertEquals(readRecord.getRecord().getSimpleField(NAME_KEY), NAME_KEY);
Assert.assertEquals(readRecord.getRecord().getListField(NAME_KEY), LIST_FIELD);