You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/03/08 19:20:57 UTC

[1/2] helix git commit: CallbackHandler to use either java config property or class annotation to enable batch callback handling.

Repository: helix
Updated Branches:
  refs/heads/master d602da9dd -> cdc3b8d60


CallbackHandler to use either java config property or class annotation to enable batch callback handling.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/2dbd88ff
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/2dbd88ff
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/2dbd88ff

Branch: refs/heads/master
Commit: 2dbd88ffed2eb84697225a58e8831b740ecd5919
Parents: d602da9
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Mar 2 14:20:14 2018 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Mar 2 14:51:44 2018 -0800

----------------------------------------------------------------------
 .../helix/manager/zk/CallbackHandler.java       |   7 ++
 .../helix/manager/zk/ZkCallbackCache.java       |   2 +-
 .../helix/TestListenerCallbackBatchMode.java    | 110 ++++++++++++++-----
 3 files changed, 92 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/2dbd88ff/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 701cf9f..42adade 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -152,6 +153,12 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
     BatchMode batchMode = _listener.getClass().getAnnotation(BatchMode.class);
     PreFetch preFetch = _listener.getClass().getAnnotation(PreFetch.class);
 
+    String asyncBatchModeEnabled = System.getProperty("isAsyncBatchModeEnabled");
+    if (asyncBatchModeEnabled != null) {
+      _batchModeEnabled = Boolean.parseBoolean(asyncBatchModeEnabled);
+      logger.info("isAsyncBatchModeEnabled by default: " + _batchModeEnabled);
+    }
+
     if (batchMode != null) {
       _batchModeEnabled = batchMode.enabled();
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/2dbd88ff/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
index 530d9d9..5b82242 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
@@ -57,7 +57,7 @@ public class ZkCallbackCache<T> extends Cache<T> implements IZkChildListener, IZ
     _accessor = accessor;
     _chrootPath = chrootPath;
 
-    _listener = new ConcurrentHashMap<String, Set<HelixPropertyListener>>();
+    _listener = new ConcurrentHashMap<>();
     _eventThread = eventThread;
 
     // init cache

http://git-wip-us.apache.org/repos/asf/helix/blob/2dbd88ff/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java b/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java
index 328098c..8cc3b0f 100644
--- a/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java
+++ b/helix-core/src/test/java/org/apache/helix/TestListenerCallbackBatchMode.java
@@ -49,7 +49,8 @@ public class TestListenerCallbackBatchMode extends ZkUnitTestBase {
       }
     }
 
-    @Override public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs,
+    @Override
+    public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs,
         NotificationContext context) {
       if (context.getType().equals(NotificationContext.Type.CALLBACK)) {
         _instanceConfigChangedCount++;
@@ -80,6 +81,14 @@ public class TestListenerCallbackBatchMode extends ZkUnitTestBase {
     }
   }
 
+  @BatchMode (enabled = false)
+  class BatchDisableddListener extends Listener {
+    @Override
+    public void onIdealStateChange(List<IdealState> idealState, NotificationContext changeContext) {
+      super.onIdealStateChange(idealState, changeContext);
+    }
+  }
+
 
   private HelixManager _manager;
   private int _numNode = 8;
@@ -122,21 +131,45 @@ public class TestListenerCallbackBatchMode extends ZkUnitTestBase {
 
     final Listener listener = new Listener();
     addListeners(listener);
+    updateConfigs();
+    verifyNonbatchedListeners(listener);
+    removeListeners(listener);
 
+    System.out.println("END " + methodName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test (dependsOnMethods = {"testNonBatchedListener", "testBatchedListener", "testMixedListener"})
+  public void testEnableBatchedListenerByJavaProperty() throws Exception {
+    String methodName = TestHelper.getTestMethodName();
+    System.out.println("START " + methodName + " at " + new Date(System.currentTimeMillis()));
+
+    System.setProperty("isAsyncBatchModeEnabled", "true");
+
+    final Listener listener = new Listener();
+    addListeners(listener);
     updateConfigs();
+    verifyBatchedListeners(listener);
 
-    Boolean result = TestHelper.verify(new TestHelper.Verifier() {
-      @Override public boolean verify() {
-        return (listener._instanceConfigChangedCount == _numNode) && (
-            listener._idealStateChangedCount == _numResource);
-      }
-    }, 12000);
+    System.setProperty("isAsyncBatchModeEnabled", "false");
+    removeListeners(listener);
 
-    Thread.sleep(50);
+    System.out.println("END " + methodName + " at " + new Date(System.currentTimeMillis()));
+  }
 
-    Assert.assertTrue(result,
-        "non batched: instance: " + listener._instanceConfigChangedCount + ", idealstate: "
-            + listener._idealStateChangedCount + "\nbatched: instance: ");
+  @Test (dependsOnMethods = {"testNonBatchedListener", "testBatchedListener", "testMixedListener"})
+  public void testDisableBatchedListenerByAnnotation() throws Exception {
+    String methodName = TestHelper.getTestMethodName();
+    System.out.println("START " + methodName + " at " + new Date(System.currentTimeMillis()));
+
+    System.setProperty("isAsyncBatchModeEnabled", "true");
+
+    final Listener listener = new BatchDisableddListener();
+    addListeners(listener);
+    updateConfigs();
+    verifyNonbatchedListeners(listener);
+
+    System.setProperty("isAsyncBatchModeEnabled", "false");
+    removeListeners(listener);
 
     System.out.println("END " + methodName + " at " + new Date(System.currentTimeMillis()));
   }
@@ -148,17 +181,9 @@ public class TestListenerCallbackBatchMode extends ZkUnitTestBase {
 
     final BatchedListener batchListener = new BatchedListener();
     addListeners(batchListener);
-
     updateConfigs();
-
-    Thread.sleep(4000);
-
-    boolean result = (batchListener._instanceConfigChangedCount < _numNode/2) && (
-        batchListener._idealStateChangedCount < _numResource/2);
-
-    Assert.assertTrue(result,
-        "batched: instance: " + batchListener._instanceConfigChangedCount + ", idealstate: "
-            + batchListener._idealStateChangedCount);
+    verifyBatchedListeners(batchListener);
+    removeListeners(batchListener);
 
     System.out.println("END " + methodName + " at " + new Date(System.currentTimeMillis()));
   }
@@ -170,26 +195,59 @@ public class TestListenerCallbackBatchMode extends ZkUnitTestBase {
 
     final MixedListener mixedListener = new MixedListener();
     addListeners(mixedListener);
-
     updateConfigs();
 
     Thread.sleep(4000);
-
     boolean result = (mixedListener._instanceConfigChangedCount == _numNode) && (
         mixedListener._idealStateChangedCount < _numResource/2);
 
-    Assert.assertTrue(result,
-        "Mixed: instance: " + mixedListener._instanceConfigChangedCount + ", idealstate: "
-            + mixedListener._idealStateChangedCount);
+    Assert.assertTrue(result, "instance callbacks: " + mixedListener._instanceConfigChangedCount
+        + ", idealstate callbacks " + mixedListener._idealStateChangedCount + "\ninstance count: "
+        + _numNode + ", idealstate counts: " + _numResource);
+
+    removeListeners(mixedListener);
 
     System.out.println("END " + methodName + " at " + new Date(System.currentTimeMillis()));
   }
 
+  private void verifyNonbatchedListeners(final Listener listener) throws Exception {
+    Boolean result = TestHelper.verify(new TestHelper.Verifier() {
+      @Override public boolean verify() {
+        return (listener._instanceConfigChangedCount == _numNode) && (
+            listener._idealStateChangedCount == _numResource);
+      }
+    }, 12000);
+
+    Thread.sleep(50);
+    Assert.assertTrue(result,
+        "instance callbacks: " + listener._instanceConfigChangedCount + ", idealstate callbacks "
+            + listener._idealStateChangedCount + "\ninstance count: " + _numNode
+            + ", idealstate counts: " + _numResource);
+  }
+
+  private void verifyBatchedListeners(Listener batchListener) throws InterruptedException {
+    Thread.sleep(3000);
+    boolean result = (batchListener._instanceConfigChangedCount < _numNode / 2) && (
+        batchListener._idealStateChangedCount < _numResource / 2);
+
+    Assert.assertTrue(result, "instance callbacks: " + batchListener._instanceConfigChangedCount
+        + ", idealstate callbacks " + batchListener._idealStateChangedCount + "\ninstance count: "
+        + _numNode + ", idealstate counts: " + _numResource);
+
+  }
+
   private void addListeners(Listener listener) throws Exception {
     _manager.addInstanceConfigChangeListener(listener);
     _manager.addIdealStateChangeListener(listener);
   }
 
+  private void removeListeners(Listener listener) throws Exception {
+    _manager.removeListener(new PropertyKey.Builder(_manager.getClusterName()).instanceConfigs(),
+        listener);
+    _manager
+        .removeListener(new PropertyKey.Builder(_manager.getClusterName()).idealStates(), listener);
+  }
+
   private void updateConfigs() throws InterruptedException {
     final Random r = new Random(System.currentTimeMillis());
     // test change content


[2/2] helix git commit: CallbackHandler to use either java config property or class annotation to enable batch callback handling.

Posted by jx...@apache.org.
CallbackHandler to use either java config property or class annotation to enable batch callback handling.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/cdc3b8d6
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/cdc3b8d6
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/cdc3b8d6

Branch: refs/heads/master
Commit: cdc3b8d6009715564d60f6ae1e3b4064d95f4c27
Parents: 2dbd88f
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Mar 2 14:20:14 2018 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Thu Mar 8 11:13:28 2018 -0800

----------------------------------------------------------------------
 .../src/test/java/org/apache/helix/TestListenerCallback.java     | 4 ----
 1 file changed, 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/cdc3b8d6/helix-core/src/test/java/org/apache/helix/TestListenerCallback.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestListenerCallback.java b/helix-core/src/test/java/org/apache/helix/TestListenerCallback.java
index e6249a5..460a777 100644
--- a/helix-core/src/test/java/org/apache/helix/TestListenerCallback.java
+++ b/helix-core/src/test/java/org/apache/helix/TestListenerCallback.java
@@ -45,7 +45,6 @@ public class TestListenerCallback extends ZkUnitTestBase {
     public void onConfigChange(List<HelixProperty> configs, NotificationContext context) {
       _configChanged = true;
       _configSize = configs.size();
-      System.out.println("onConfigChange invoked: " + configs.size() + ", " + configs);
     }
 
     public void reset () {
@@ -68,7 +67,6 @@ public class TestListenerCallback extends ZkUnitTestBase {
         NotificationContext context) {
       _instanceConfigChanged = true;
       _instanceConfigs = instanceConfigs;
-      System.out.println("onInstanceConfigChange invoked: " + instanceConfigs);
     }
 
     @Override
@@ -76,7 +74,6 @@ public class TestListenerCallback extends ZkUnitTestBase {
         NotificationContext context) {
       _clusterConfigChanged = true;
       _clusterConfig = clusterConfig;
-      System.out.println("onClusterConfigChange invoked: " + clusterConfig);
     }
 
     @Override
@@ -84,7 +81,6 @@ public class TestListenerCallback extends ZkUnitTestBase {
         NotificationContext context) {
       _resourceConfigChanged = true;
       _resourceConfigs = resourceConfigs;
-      System.out.println("onResourceConfigChange invoked: " + resourceConfigs);
     }
 
     public void reset () {