You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/03/08 19:20:57 UTC
[1/2] helix git commit: CallbackHandler to use either java config
property or class annotation to enable batch callback handling.
Repository: helix
Updated Branches:
refs/heads/master d602da9dd -> cdc3b8d60
CallbackHandler to use either java config property or class annotation to enable batch callback handling.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/2dbd88ff
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/2dbd88ff
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/2dbd88ff
Branch: refs/heads/master
Commit: 2dbd88ffed2eb84697225a58e8831b740ecd5919
Parents: d602da9
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Mar 2 14:20:14 2018 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Mar 2 14:51:44 2018 -0800
----------------------------------------------------------------------
.../helix/manager/zk/CallbackHandler.java | 7 ++
.../helix/manager/zk/ZkCallbackCache.java | 2 +-
.../helix/TestListenerCallbackBatchMode.java | 110 ++++++++++++++-----
3 files changed, 92 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/2dbd88ff/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 701cf9f..42adade 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -152,6 +153,12 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
BatchMode batchMode = _listener.getClass().getAnnotation(BatchMode.class);
PreFetch preFetch = _listener.getClass().getAnnotation(PreFetch.class);
+ String asyncBatchModeEnabled = System.getProperty("isAsyncBatchModeEnabled");
+ if (asyncBatchModeEnabled != null) {
+ _batchModeEnabled = Boolean.parseBoolean(asyncBatchModeEnabled);
+ logger.info("isAsyncBatchModeEnabled by default: " + _batchModeEnabled);
+ }
+
if (batchMode != null) {
_batchModeEnabled = batchMode.enabled();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/2dbd88ff/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
index 530d9d9..5b82242 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
@@ -57,7 +57,7 @@ public class ZkCallbackCache<T> extends Cache<T> implements IZkChildListener, IZ
_accessor = accessor;
_chrootPath = chrootPath;
- _listener = new ConcurrentHashMap<String, Set<HelixPropertyListener>>();
+ _listener = new ConcurrentHashMap<>();
_eventThread = eventThread;
// init cache
http://git-wip-us.apache.org/repos/asf/helix/blob/2dbd88ff/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java b/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java
index 328098c..8cc3b0f 100644
--- a/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java
+++ b/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java
@@ -49,7 +49,8 @@ public class TestListenerCallbackBatchMode extends ZkUnitTestBase {
}
}
- @Override public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs,
+ @Override
+ public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs,
NotificationContext context) {
if (context.getType().equals(NotificationContext.Type.CALLBACK)) {
_instanceConfigChangedCount++;
@@ -80,6 +81,14 @@ public class TestListenerCallbackBatchMode extends ZkUnitTestBase {
}
}
+ @BatchMode (enabled = false)
+ class BatchDisableddListener extends Listener {
+ @Override
+ public void onIdealStateChange(List<IdealState> idealState, NotificationContext changeContext) {
+ super.onIdealStateChange(idealState, changeContext);
+ }
+ }
+
private HelixManager _manager;
private int _numNode = 8;
@@ -122,21 +131,45 @@ public class TestListenerCallbackBatchMode extends ZkUnitTestBase {
final Listener listener = new Listener();
addListeners(listener);
+ updateConfigs();
+ verifyNonbatchedListeners(listener);
+ removeListeners(listener);
+ System.out.println("END " + methodName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test (dependsOnMethods = {"testNonBatchedListener", "testBatchedListener", "testMixedListener"})
+ public void testEnableBatchedListenerByJavaProperty() throws Exception {
+ String methodName = TestHelper.getTestMethodName();
+ System.out.println("START " + methodName + " at " + new Date(System.currentTimeMillis()));
+
+ System.setProperty("isAsyncBatchModeEnabled", "true");
+
+ final Listener listener = new Listener();
+ addListeners(listener);
updateConfigs();
+ verifyBatchedListeners(listener);
- Boolean result = TestHelper.verify(new TestHelper.Verifier() {
- @Override public boolean verify() {
- return (listener._instanceConfigChangedCount == _numNode) && (
- listener._idealStateChangedCount == _numResource);
- }
- }, 12000);
+ System.setProperty("isAsyncBatchModeEnabled", "false");
+ removeListeners(listener);
- Thread.sleep(50);
+ System.out.println("END " + methodName + " at " + new Date(System.currentTimeMillis()));
+ }
- Assert.assertTrue(result,
- "non batched: instance: " + listener._instanceConfigChangedCount + ", idealstate: "
- + listener._idealStateChangedCount + "\nbatched: instance: ");
+ @Test (dependsOnMethods = {"testNonBatchedListener", "testBatchedListener", "testMixedListener"})
+ public void testDisableBatchedListenerByAnnotation() throws Exception {
+ String methodName = TestHelper.getTestMethodName();
+ System.out.println("START " + methodName + " at " + new Date(System.currentTimeMillis()));
+
+ System.setProperty("isAsyncBatchModeEnabled", "true");
+
+ final Listener listener = new BatchDisableddListener();
+ addListeners(listener);
+ updateConfigs();
+ verifyNonbatchedListeners(listener);
+
+ System.setProperty("isAsyncBatchModeEnabled", "false");
+ removeListeners(listener);
System.out.println("END " + methodName + " at " + new Date(System.currentTimeMillis()));
}
@@ -148,17 +181,9 @@ public class TestListenerCallbackBatchMode extends ZkUnitTestBase {
final BatchedListener batchListener = new BatchedListener();
addListeners(batchListener);
-
updateConfigs();
-
- Thread.sleep(4000);
-
- boolean result = (batchListener._instanceConfigChangedCount < _numNode/2) && (
- batchListener._idealStateChangedCount < _numResource/2);
-
- Assert.assertTrue(result,
- "batched: instance: " + batchListener._instanceConfigChangedCount + ", idealstate: "
- + batchListener._idealStateChangedCount);
+ verifyBatchedListeners(batchListener);
+ removeListeners(batchListener);
System.out.println("END " + methodName + " at " + new Date(System.currentTimeMillis()));
}
@@ -170,26 +195,59 @@ public class TestListenerCallbackBatchMode extends ZkUnitTestBase {
final MixedListener mixedListener = new MixedListener();
addListeners(mixedListener);
-
updateConfigs();
Thread.sleep(4000);
-
boolean result = (mixedListener._instanceConfigChangedCount == _numNode) && (
mixedListener._idealStateChangedCount < _numResource/2);
- Assert.assertTrue(result,
- "Mixed: instance: " + mixedListener._instanceConfigChangedCount + ", idealstate: "
- + mixedListener._idealStateChangedCount);
+ Assert.assertTrue(result, "instance callbacks: " + mixedListener._instanceConfigChangedCount
+ + ", idealstate callbacks " + mixedListener._idealStateChangedCount + "\ninstance count: "
+ + _numNode + ", idealstate counts: " + _numResource);
+
+ removeListeners(mixedListener);
System.out.println("END " + methodName + " at " + new Date(System.currentTimeMillis()));
}
+ private void verifyNonbatchedListeners(final Listener listener) throws Exception {
+ Boolean result = TestHelper.verify(new TestHelper.Verifier() {
+ @Override public boolean verify() {
+ return (listener._instanceConfigChangedCount == _numNode) && (
+ listener._idealStateChangedCount == _numResource);
+ }
+ }, 12000);
+
+ Thread.sleep(50);
+ Assert.assertTrue(result,
+ "instance callbacks: " + listener._instanceConfigChangedCount + ", idealstate callbacks "
+ + listener._idealStateChangedCount + "\ninstance count: " + _numNode
+ + ", idealstate counts: " + _numResource);
+ }
+
+ private void verifyBatchedListeners(Listener batchListener) throws InterruptedException {
+ Thread.sleep(3000);
+ boolean result = (batchListener._instanceConfigChangedCount < _numNode / 2) && (
+ batchListener._idealStateChangedCount < _numResource / 2);
+
+ Assert.assertTrue(result, "instance callbacks: " + batchListener._instanceConfigChangedCount
+ + ", idealstate callbacks " + batchListener._idealStateChangedCount + "\ninstance count: "
+ + _numNode + ", idealstate counts: " + _numResource);
+
+ }
+
private void addListeners(Listener listener) throws Exception {
_manager.addInstanceConfigChangeListener(listener);
_manager.addIdealStateChangeListener(listener);
}
+ private void removeListeners(Listener listener) throws Exception {
+ _manager.removeListener(new PropertyKey.Builder(_manager.getClusterName()).instanceConfigs(),
+ listener);
+ _manager
+ .removeListener(new PropertyKey.Builder(_manager.getClusterName()).idealStates(), listener);
+ }
+
private void updateConfigs() throws InterruptedException {
final Random r = new Random(System.currentTimeMillis());
// test change content
[2/2] helix git commit: CallbackHandler to use either java config
property or class annotation to enable batch callback handling.
Posted by jx...@apache.org.
CallbackHandler to use either java config property or class annotation to enable batch callback handling.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/cdc3b8d6
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/cdc3b8d6
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/cdc3b8d6
Branch: refs/heads/master
Commit: cdc3b8d6009715564d60f6ae1e3b4064d95f4c27
Parents: 2dbd88f
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Mar 2 14:20:14 2018 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Thu Mar 8 11:13:28 2018 -0800
----------------------------------------------------------------------
.../src/test/java/org/apache/helix/TestListenerCallback.java | 4 ----
1 file changed, 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/cdc3b8d6/helix-core/src/test/java/org/apache/helix/TestListenerCallback.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestListenerCallback.java b/helix-core/src/test/java/org/apache/helix/TestListenerCallback.java
index e6249a5..460a777 100644
--- a/helix-core/src/test/java/org/apache/helix/TestListenerCallback.java
+++ b/helix-core/src/test/java/org/apache/helix/TestListenerCallback.java
@@ -45,7 +45,6 @@ public class TestListenerCallback extends ZkUnitTestBase {
public void onConfigChange(List<HelixProperty> configs, NotificationContext context) {
_configChanged = true;
_configSize = configs.size();
- System.out.println("onConfigChange invoked: " + configs.size() + ", " + configs);
}
public void reset () {
@@ -68,7 +67,6 @@ public class TestListenerCallback extends ZkUnitTestBase {
NotificationContext context) {
_instanceConfigChanged = true;
_instanceConfigs = instanceConfigs;
- System.out.println("onInstanceConfigChange invoked: " + instanceConfigs);
}
@Override
@@ -76,7 +74,6 @@ public class TestListenerCallback extends ZkUnitTestBase {
NotificationContext context) {
_clusterConfigChanged = true;
_clusterConfig = clusterConfig;
- System.out.println("onClusterConfigChange invoked: " + clusterConfig);
}
@Override
@@ -84,7 +81,6 @@ public class TestListenerCallback extends ZkUnitTestBase {
NotificationContext context) {
_resourceConfigChanged = true;
_resourceConfigs = resourceConfigs;
- System.out.println("onResourceConfigChange invoked: " + resourceConfigs);
}
public void reset () {