You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/07/10 17:09:06 UTC
[1/2] curator git commit: Make sure
readValueAndNotifyListenersInBackground() is called after a connection
problem
Repository: curator
Updated Branches:
refs/heads/CURATOR-2.0 c7df8e251 -> 6af5f367e
Make sure readValueAndNotifyListenersInBackground() is called after a connection problem
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/3e159bdd
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/3e159bdd
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/3e159bdd
Branch: refs/heads/CURATOR-2.0
Commit: 3e159bddb91c6e8b4e9e27ccbf00e06b4f35638e
Parents: c7df8e2
Author: randgalt <ra...@apache.org>
Authored: Mon Jul 10 11:05:57 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jul 10 11:21:57 2017 -0500
----------------------------------------------------------------------
.../framework/recipes/shared/SharedValue.java | 30 +++-
.../recipes/shared/TestSharedCount.java | 148 ++++++++++++++++++-
2 files changed, 175 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/3e159bdd/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 1f9df37..5478a8f 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@ -19,6 +19,7 @@
package org.apache.curator.framework.recipes.shared;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
@@ -56,8 +57,9 @@ public class SharedValue implements Closeable, SharedValueReader
private final byte[] seedValue;
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
private final AtomicReference<VersionedValue<byte[]>> currentValue;
+ private final CuratorWatcher watcher;
- private final CuratorWatcher watcher = new CuratorWatcher()
+ private class SharedValueCuratorWatcher implements CuratorWatcher
{
@Override
public void process(WatchedEvent event) throws Exception
@@ -68,7 +70,7 @@ public class SharedValue implements Closeable, SharedValueReader
readValueAndNotifyListenersInBackground();
}
}
- };
+ }
private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
@@ -76,6 +78,18 @@ public class SharedValue implements Closeable, SharedValueReader
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
notifyListenerOfStateChanged(newState);
+ if ( newState.isConnected() )
+ {
+ try
+ {
+ readValueAndNotifyListenersInBackground();
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ log.error("Could not read value after reconnect", e);
+ }
+ }
}
};
@@ -96,6 +110,18 @@ public class SharedValue implements Closeable, SharedValueReader
this.client = client;
this.path = PathUtils.validatePath(path);
this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
+ this.watcher = new SharedValueCuratorWatcher();
+ currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
+ }
+
+ @VisibleForTesting
+ protected SharedValue(CuratorFramework client, String path, byte[] seedValue, CuratorWatcher watcher)
+ {
+ this.client = client;
+ this.path = PathUtils.validatePath(path);
+ this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
+ // inject watcher for testing
+ this.watcher = watcher;
currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
}
http://git-wip-us.apache.org/repos/asf/curator/blob/3e159bdd/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index 7939f6e..a6a32e9 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -25,6 +25,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
@@ -32,6 +33,7 @@ import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.WatchedEvent;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.List;
@@ -42,6 +44,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -167,10 +170,29 @@ public class TestSharedCount extends BaseClassForTests
client.start();
count.start();
+ final CountDownLatch setLatch = new CountDownLatch(3);
+ SharedCountListener listener = new SharedCountListener()
+ {
+ @Override
+ public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
+ {
+ setLatch.countDown();
+ }
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ // nop
+ }
+ };
+ count.addListener(listener);
+
Assert.assertTrue(count.trySetCount(1));
Assert.assertTrue(count.trySetCount(2));
Assert.assertTrue(count.trySetCount(10));
Assert.assertEquals(count.getCount(), 10);
+
+ Assert.assertTrue(new Timing().awaitLatch(setLatch));
}
finally
{
@@ -246,12 +268,30 @@ public class TestSharedCount extends BaseClassForTests
Assert.assertTrue(count2.trySetCount(versionedValue, 20));
timing.sleepABit();
+ final CountDownLatch setLatch = new CountDownLatch(2);
+ SharedCountListener listener = new SharedCountListener()
+ {
+ @Override
+ public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
+ {
+ setLatch.countDown();
+ }
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ // nop
+ }
+ };
+ count1.addListener(listener);
VersionedValue<Integer> versionedValue1 = count1.getVersionedValue();
VersionedValue<Integer> versionedValue2 = count2.getVersionedValue();
Assert.assertTrue(count2.trySetCount(versionedValue2, 30));
Assert.assertFalse(count1.trySetCount(versionedValue1, 40));
+
versionedValue1 = count1.getVersionedValue();
Assert.assertTrue(count1.trySetCount(versionedValue1, 40));
+ Assert.assertTrue(timing.awaitLatch(setLatch));
}
finally
{
@@ -368,6 +408,7 @@ public class TestSharedCount extends BaseClassForTests
server.restart();
Assert.assertTrue(getReconnectEvent.await(2, TimeUnit.SECONDS));
+ Assert.assertEquals(numChangeEvents.get(), 1);
sharedCount.trySetCount(sharedCount.getVersionedValue(), 12);
@@ -381,7 +422,9 @@ public class TestSharedCount extends BaseClassForTests
}).forPath("/count");
flushDone.await(5, TimeUnit.SECONDS);
- Assert.assertEquals(2, numChangeEvents.get());
+ // CURATOR-311: when a Curator client's state became RECONNECTED, countHasChanged method is called back
+ // because the Curator client calls readValueAndNotifyListenersInBackground in SharedValue#ConnectionStateListener#stateChanged.
+ Assert.assertEquals(numChangeEvents.get(), 3);
}
finally
{
@@ -389,4 +432,107 @@ public class TestSharedCount extends BaseClassForTests
CloseableUtils.closeQuietly(curatorFramework);
}
}
+
+ @Test
+ public void testDisconnectReconnectWithMultipleClients() throws Exception
+ {
+ CuratorFramework curatorFramework1 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
+ CuratorFramework curatorFramework2 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
+
+ curatorFramework1.start();
+ curatorFramework1.blockUntilConnected();
+ curatorFramework2.start();
+ curatorFramework2.blockUntilConnected();
+
+ final String sharedCountPath = "/count";
+ final int initialCount = 10;
+ SharedCount sharedCount1 = new SharedCount(curatorFramework1, sharedCountPath, initialCount);
+ SharedCount sharedCountWithFaultyWatcher = createSharedCountWithFaultyWatcher(curatorFramework2, sharedCountPath, initialCount);
+
+ class MySharedCountListener implements SharedCountListener
+ {
+ final public Phaser gotSuspendEvent = new Phaser(1);
+ final public Phaser gotChangeEvent = new Phaser(1);
+ final public Phaser getReconnectEvent = new Phaser(1);
+ final public AtomicInteger numChangeEvents = new AtomicInteger(0);
+
+ @Override
+ public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
+ {
+ numChangeEvents.incrementAndGet();
+ gotChangeEvent.arrive();
+ }
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if (newState == ConnectionState.SUSPENDED) {
+ gotSuspendEvent.arrive();
+ } else if (newState.isConnected()) {
+ getReconnectEvent.arrive();
+ }
+ }
+ }
+
+ MySharedCountListener listener1 = new MySharedCountListener();
+ sharedCount1.addListener(listener1);
+ sharedCount1.start();
+ MySharedCountListener listener2 = new MySharedCountListener();
+ sharedCountWithFaultyWatcher.addListener(listener2);
+
+ try
+ {
+ sharedCount1.setCount(12);
+ Assert.assertEquals(listener1.gotChangeEvent.awaitAdvanceInterruptibly(0, 2, TimeUnit.SECONDS), 1);
+ Assert.assertEquals(sharedCount1.getCount(), 12);
+
+ Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 10);
+ // new counter with faultyWatcher start
+ sharedCountWithFaultyWatcher.start();
+
+ for (int i = 0; i < 10; i++) {
+ sharedCount1.setCount(13 + i);
+ Assert.assertEquals(sharedCount1.getCount(), 13 + i);
+
+ server.restart();
+
+ Assert.assertEquals(listener2.getReconnectEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+ // CURATOR-311 introduces to Curator's client reading server's shared count value
+ // when client's state gets ConnectionState.RECONNECTED. Following tests ensures that.
+ Assert.assertEquals(listener2.gotChangeEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+ Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 13 + i);
+ }
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(sharedCount1);
+ CloseableUtils.closeQuietly(curatorFramework1);
+ CloseableUtils.closeQuietly(sharedCountWithFaultyWatcher);
+ CloseableUtils.closeQuietly(curatorFramework2);
+ }
+ }
+
+ private SharedCount createSharedCountWithFaultyWatcher(CuratorFramework curatorFramework, String path, int val) {
+
+ final CuratorWatcher faultyWatcher = new CuratorWatcher() {
+ @Override
+ public void process(WatchedEvent event) throws Exception {
+ // everything will be ignored
+ }
+ };
+
+ class FaultySharedValue extends SharedValue {
+ public FaultySharedValue(CuratorFramework client, String path, byte[] seedValue) {
+ super(client, path, seedValue, faultyWatcher);
+ }
+ };
+
+ final SharedValue faultySharedValue = new FaultySharedValue(curatorFramework, path, SharedCount.toBytes(val));
+ class FaultySharedCount extends SharedCount {
+ public FaultySharedCount(CuratorFramework client, String path, int val) {
+ super(client, path, val);
+ }
+ };
+ return new FaultySharedCount(curatorFramework, path, val);
+ }
}
[2/2] curator git commit: fixed some merge problems from CURATOR-311
merge
Posted by ra...@apache.org.
fixed some merge problems from CURATOR-311 merge
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/6af5f367
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/6af5f367
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/6af5f367
Branch: refs/heads/CURATOR-2.0
Commit: 6af5f367efd238c7e61f0b9ea341d69c7fdaa736
Parents: 3e159bd
Author: randgalt <ra...@apache.org>
Authored: Mon Jul 10 12:09:01 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jul 10 12:09:01 2017 -0500
----------------------------------------------------------------------
.../framework/recipes/shared/SharedCount.java | 5 +++++
.../framework/recipes/shared/TestSharedCount.java | 17 ++++++++++-------
2 files changed, 15 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/6af5f367/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
index 87fffdd..bdfa844 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
@@ -49,6 +49,11 @@ public class SharedCount implements Closeable, SharedCountReader, Listenable<Sha
sharedValue = new SharedValue(client, path, toBytes(seedValue));
}
+ protected SharedCount(CuratorFramework client, String path, SharedValue sv)
+ {
+ sharedValue = sv;
+ }
+
@Override
public int getCount()
{
http://git-wip-us.apache.org/repos/asf/curator/blob/6af5f367/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index a6a32e9..2ef93eb 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -77,6 +77,7 @@ public class TestSharedCount extends BaseClassForTests
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
clients.add(client);
client.start();
+ client.checkExists().forPath("/"); // clear initial connect event
SharedCount count = new SharedCount(client, "/count", 10);
counts.add(count);
@@ -120,6 +121,7 @@ public class TestSharedCount extends BaseClassForTests
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
clients.add(client);
client.start();
+ client.checkExists().forPath("/"); // clear initial connect event
Assert.assertTrue(startLatch.await(10, TimeUnit.SECONDS));
@@ -436,13 +438,14 @@ public class TestSharedCount extends BaseClassForTests
@Test
public void testDisconnectReconnectWithMultipleClients() throws Exception
{
+ Timing timing = new Timing();
CuratorFramework curatorFramework1 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
CuratorFramework curatorFramework2 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
curatorFramework1.start();
- curatorFramework1.blockUntilConnected();
+ curatorFramework1.checkExists().forPath("/"); // clear initial connect events
curatorFramework2.start();
- curatorFramework2.blockUntilConnected();
+ curatorFramework2.checkExists().forPath("/"); // clear initial connect events
final String sharedCountPath = "/count";
final int initialCount = 10;
@@ -468,7 +471,7 @@ public class TestSharedCount extends BaseClassForTests
{
if (newState == ConnectionState.SUSPENDED) {
gotSuspendEvent.arrive();
- } else if (newState.isConnected()) {
+ } else if (newState == ConnectionState.RECONNECTED) {
getReconnectEvent.arrive();
}
}
@@ -483,7 +486,7 @@ public class TestSharedCount extends BaseClassForTests
try
{
sharedCount1.setCount(12);
- Assert.assertEquals(listener1.gotChangeEvent.awaitAdvanceInterruptibly(0, 2, TimeUnit.SECONDS), 1);
+ Assert.assertEquals(listener1.gotChangeEvent.awaitAdvanceInterruptibly(0, timing.forWaiting().seconds(), TimeUnit.SECONDS), 1);
Assert.assertEquals(sharedCount1.getCount(), 12);
Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 10);
@@ -496,10 +499,10 @@ public class TestSharedCount extends BaseClassForTests
server.restart();
- Assert.assertEquals(listener2.getReconnectEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+ Assert.assertEquals(listener2.getReconnectEvent.awaitAdvanceInterruptibly(i, timing.forWaiting().seconds(), TimeUnit.SECONDS), i + 1);
// CURATOR-311 introduces to Curator's client reading server's shared count value
// when client's state gets ConnectionState.RECONNECTED. Following tests ensures that.
- Assert.assertEquals(listener2.gotChangeEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+ Assert.assertEquals(listener2.gotChangeEvent.awaitAdvanceInterruptibly(i, timing.forWaiting().seconds(), TimeUnit.SECONDS), i + 1);
Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 13 + i);
}
}
@@ -530,7 +533,7 @@ public class TestSharedCount extends BaseClassForTests
final SharedValue faultySharedValue = new FaultySharedValue(curatorFramework, path, SharedCount.toBytes(val));
class FaultySharedCount extends SharedCount {
public FaultySharedCount(CuratorFramework client, String path, int val) {
- super(client, path, val);
+ super(client, path, faultySharedValue);
}
};
return new FaultySharedCount(curatorFramework, path, val);