You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ca...@apache.org on 2016/08/31 01:01:53 UTC
[2/3] curator git commit: CURATOR-344 - do watch event processing
tasks in the background and limit shared value watcher to valid change events
to avoid work on disconnect
CURATOR-344 - do watch event processing tasks in the background and limit shared value watcher to valid change events to avoid work on disconnect
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/022de392
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/022de392
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/022de392
Branch: refs/heads/CURATOR-3.0
Commit: 022de3921a120c6f86cc6e21442327cc04b66cd2
Parents: ef33ccb
Author: gtully <ga...@gmail.com>
Authored: Thu Aug 18 19:34:10 2016 +0100
Committer: gtully <ga...@gmail.com>
Committed: Tue Aug 30 13:09:56 2016 +0100
----------------------------------------------------------------------
.../framework/recipes/shared/SharedValue.java | 24 ++++-
.../recipes/shared/TestSharedCount.java | 106 +++++++++++++++++++
2 files changed, 127 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/022de392/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index dddc471..1f9df37 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@ -22,6 +22,8 @@ package org.apache.curator.framework.recipes.shared;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.listen.ListenerContainer;
import org.apache.curator.framework.state.ConnectionState;
@@ -30,6 +32,7 @@ import org.apache.curator.utils.PathUtils;
import org.apache.curator.utils.ThreadUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,10 +62,10 @@ public class SharedValue implements Closeable, SharedValueReader
@Override
public void process(WatchedEvent event) throws Exception
{
- if ( state.get() == State.STARTED )
+ if ( state.get() == State.STARTED && event.getType() != Watcher.Event.EventType.None )
{
- readValue();
- notifyListeners();
+ // don't block event thread in possible retry
+ readValueAndNotifyListenersInBackground();
}
}
};
@@ -248,6 +251,21 @@ public class SharedValue implements Closeable, SharedValueReader
updateValue(localStat.getVersion(), bytes);
}
+ private final BackgroundCallback upadateAndNotifyListenerCallback = new BackgroundCallback() {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+ if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
+ updateValue(event.getStat().getVersion(), event.getData());
+ notifyListeners();
+ }
+ }
+ };
+
+ private void readValueAndNotifyListenersInBackground() throws Exception
+ {
+ client.getData().usingWatcher(watcher).inBackground(upadateAndNotifyListenerCallback).forPath(path);
+ }
+
private void notifyListeners()
{
final byte[] localValue = getValue();
http://git-wip-us.apache.org/repos/asf/curator/blob/022de392/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index a1f4d8c..7939f6e 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -23,7 +23,11 @@ import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
@@ -40,6 +44,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
public class TestSharedCount extends BaseClassForTests
{
@@ -283,4 +288,105 @@ public class TestSharedCount extends BaseClassForTests
CloseableUtils.closeQuietly(client1);
}
}
+
+
+ @Test
+ public void testDisconnectEventOnWatcherDoesNotRetry() throws Exception
+ {
+ final CountDownLatch gotSuspendEvent = new CountDownLatch(1);
+
+ CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 1000));
+ curatorFramework.start();
+ curatorFramework.blockUntilConnected();
+
+ SharedCount sharedCount = new SharedCount(curatorFramework, "/count", 10);
+ sharedCount.start();
+
+ curatorFramework.getConnectionStateListenable().addListener(new ConnectionStateListener() {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState) {
+ if (newState == ConnectionState.SUSPENDED) {
+ gotSuspendEvent.countDown();
+ }
+ }
+ });
+
+ try
+ {
+ server.stop();
+ // if watcher goes into 10second retry loop we won't get timely notification
+ Assert.assertTrue(gotSuspendEvent.await(5, TimeUnit.SECONDS));
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(sharedCount);
+ CloseableUtils.closeQuietly(curatorFramework);
+ }
+ }
+
+ @Test
+ public void testDisconnectReconnectEventDoesNotFireValueWatcher() throws Exception
+ {
+ final CountDownLatch gotSuspendEvent = new CountDownLatch(1);
+ final CountDownLatch gotChangeEvent = new CountDownLatch(1);
+ final CountDownLatch getReconnectEvent = new CountDownLatch(1);
+
+ final AtomicInteger numChangeEvents = new AtomicInteger(0);
+
+
+ CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
+ curatorFramework.start();
+ curatorFramework.blockUntilConnected();
+
+ SharedCount sharedCount = new SharedCount(curatorFramework, "/count", 10);
+
+ sharedCount.addListener(new SharedCountListener() {
+ @Override
+ public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
+ numChangeEvents.incrementAndGet();
+ gotChangeEvent.countDown();
+ }
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState) {
+ if (newState == ConnectionState.SUSPENDED) {
+ gotSuspendEvent.countDown();
+ } else if (newState == ConnectionState.RECONNECTED) {
+ getReconnectEvent.countDown();
+ }
+ }
+ });
+ sharedCount.start();
+
+ try
+ {
+ sharedCount.setCount(11);
+ Assert.assertTrue(gotChangeEvent.await(2, TimeUnit.SECONDS));
+
+ server.stop();
+ Assert.assertTrue(gotSuspendEvent.await(2, TimeUnit.SECONDS));
+
+ server.restart();
+ Assert.assertTrue(getReconnectEvent.await(2, TimeUnit.SECONDS));
+
+ sharedCount.trySetCount(sharedCount.getVersionedValue(), 12);
+
+ // flush background task queue
+ final CountDownLatch flushDone = new CountDownLatch(1);
+ curatorFramework.getData().inBackground(new BackgroundCallback() {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+ flushDone.countDown();
+ }
+ }).forPath("/count");
+ flushDone.await(5, TimeUnit.SECONDS);
+
+ Assert.assertEquals(2, numChangeEvents.get());
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(sharedCount);
+ CloseableUtils.closeQuietly(curatorFramework);
+ }
+ }
}