You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/07/10 17:10:51 UTC

[1/5] curator git commit: CURATOR-311 - SharedValue could hold stall data after reconnecting

Repository: curator
Updated Branches:
  refs/heads/master 0f5d10da3 -> 12cc7cec5


CURATOR-311 - SharedValue could hold stall data after reconnecting


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/8c1c5ffa
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/8c1c5ffa
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/8c1c5ffa

Branch: refs/heads/master
Commit: 8c1c5ffa287d22eaea18bf6f89a4a8bf6d9b871c
Parents: 35d2cc0
Author: Tsuyoshi Ozawa <oz...@apache.org>
Authored: Wed Jan 11 20:30:46 2017 +0900
Committer: Tsuyoshi Ozawa <oz...@apache.org>
Committed: Fri Feb 24 16:34:51 2017 +0900

----------------------------------------------------------------------
 .../framework/recipes/shared/SharedCount.java   |   5 +
 .../framework/recipes/shared/SharedValue.java   |  27 ++++-
 .../recipes/shared/TestSharedCount.java         | 116 ++++++++++++++++++-
 3 files changed, 146 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/8c1c5ffa/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
index 87fffdd..bdfa844 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
@@ -49,6 +49,11 @@ public class SharedCount implements Closeable, SharedCountReader, Listenable<Sha
         sharedValue = new SharedValue(client, path, toBytes(seedValue));
     }
 
+    protected SharedCount(CuratorFramework client, String path, SharedValue sv)
+    {
+        sharedValue = sv;
+    }
+
     @Override
     public int getCount()
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/8c1c5ffa/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 1f9df37..7e3f26a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@ -19,6 +19,7 @@
 
 package org.apache.curator.framework.recipes.shared;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
@@ -56,8 +57,9 @@ public class SharedValue implements Closeable, SharedValueReader
     private final byte[] seedValue;
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
     private final AtomicReference<VersionedValue<byte[]>> currentValue;
+    private final CuratorWatcher watcher;
 
-    private final CuratorWatcher watcher = new CuratorWatcher()
+    private class SharedValueCuratorWatcher implements CuratorWatcher
     {
         @Override
         public void process(WatchedEvent event) throws Exception
@@ -76,6 +78,17 @@ public class SharedValue implements Closeable, SharedValueReader
         public void stateChanged(CuratorFramework client, ConnectionState newState)
         {
             notifyListenerOfStateChanged(newState);
+            if ( newState == ConnectionState.RECONNECTED )
+            {
+                try
+                {
+                    readValueAndNotifyListenersInBackground();
+                }
+                catch ( Exception e )
+                {
+                    log.error("Could not read value after reconnect", e);
+                }
+            }
         }
     };
 
@@ -96,6 +109,18 @@ public class SharedValue implements Closeable, SharedValueReader
         this.client = client;
         this.path = PathUtils.validatePath(path);
         this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
+        this.watcher = new SharedValueCuratorWatcher();
+        currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
+    }
+
+    @VisibleForTesting
+    protected SharedValue(CuratorFramework client, String path, byte[] seedValue, CuratorWatcher watcher)
+    {
+        this.client = client;
+        this.path = PathUtils.validatePath(path);
+        this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
+        // inject watcher for testing
+        this.watcher = watcher;
         currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/8c1c5ffa/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index 7939f6e..330c8f4 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -25,6 +25,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryNTimes;
@@ -32,6 +33,7 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.WatchedEvent;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.List;
@@ -42,6 +44,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -368,6 +371,7 @@ public class TestSharedCount extends BaseClassForTests
 
             server.restart();
             Assert.assertTrue(getReconnectEvent.await(2, TimeUnit.SECONDS));
+            Assert.assertEquals(numChangeEvents.get(), 1);
 
             sharedCount.trySetCount(sharedCount.getVersionedValue(), 12);
 
@@ -381,7 +385,9 @@ public class TestSharedCount extends BaseClassForTests
             }).forPath("/count");
             flushDone.await(5, TimeUnit.SECONDS);
 
-            Assert.assertEquals(2, numChangeEvents.get());
+            // CURATOR-311: when a Curator client's state became RECONNECTED, countHasChanged method is called back
+            // because the Curator client calls readValueAndNotifyListenersInBackground in SharedValue#ConnectionStateListener#stateChanged.
+            Assert.assertEquals(numChangeEvents.get(), 3);
         }
         finally
         {
@@ -389,4 +395,112 @@ public class TestSharedCount extends BaseClassForTests
             CloseableUtils.closeQuietly(curatorFramework);
         }
     }
+
+
+    @Test
+    public void testDisconnectReconnectWithMultipleClients() throws Exception
+    {
+        CuratorFramework curatorFramework1 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
+        CuratorFramework curatorFramework2 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
+
+        curatorFramework1.start();
+        curatorFramework1.blockUntilConnected();
+        curatorFramework2.start();
+        curatorFramework2.blockUntilConnected();
+
+        final String sharedCountPath = "/count";
+        final int initialCount = 10;
+        SharedCount sharedCount1 = new SharedCount(curatorFramework1, sharedCountPath, initialCount);
+        SharedCount sharedCountWithFaultyWatcher = createSharedCountWithFaultyWatcher(curatorFramework2, sharedCountPath, initialCount);
+
+        class MySharedCountListener implements SharedCountListener
+        {
+            final public Phaser gotSuspendEvent = new Phaser(1);
+            final public Phaser gotChangeEvent = new Phaser(1);
+            final public Phaser getReconnectEvent = new Phaser(1);
+            final public AtomicInteger numChangeEvents = new AtomicInteger(0);
+
+            @Override
+            public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
+            {
+                numChangeEvents.incrementAndGet();
+                gotChangeEvent.arrive();
+            }
+
+            @Override
+            public void stateChanged(CuratorFramework client, ConnectionState newState)
+            {
+                if (newState == ConnectionState.SUSPENDED) {
+                    gotSuspendEvent.arrive();
+                } else if (newState == ConnectionState.RECONNECTED) {
+                    getReconnectEvent.arrive();
+                }
+            }
+        }
+
+        MySharedCountListener listener1 = new MySharedCountListener();
+        sharedCount1.addListener(listener1);
+        sharedCount1.start();
+        MySharedCountListener listener2 = new MySharedCountListener();
+        sharedCountWithFaultyWatcher.addListener(listener2);
+
+        try
+        {
+            sharedCount1.setCount(12);
+            Assert.assertEquals(listener1.gotChangeEvent.awaitAdvanceInterruptibly(0, 2, TimeUnit.SECONDS), 1);
+            Assert.assertEquals(sharedCount1.getCount(), 12);
+
+            Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 10);
+            // new counter with faultyWatcher start
+            sharedCountWithFaultyWatcher.start();
+
+            for (int i = 0; i < 10; i++) {
+                sharedCount1.setCount(13 + i);
+                Assert.assertEquals(sharedCount1.getCount(), 13 + i);
+
+                server.restart();
+
+                Assert.assertEquals(listener2.getReconnectEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+                // CURATOR-311 introduces to Curator's client reading server's shared count value
+                // when client's state gets ConnectionState.RECONNECTED. Following tests ensures that.
+                Assert.assertEquals(listener2.gotChangeEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+                Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 13 + i);
+            }
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(sharedCount1);
+            CloseableUtils.closeQuietly(curatorFramework1);
+            CloseableUtils.closeQuietly(sharedCountWithFaultyWatcher);
+            CloseableUtils.closeQuietly(curatorFramework2);
+        }
+    }
+
+    private SharedCount createSharedCountWithFaultyWatcher(CuratorFramework curatorFramework, String path, int val) {
+
+        class FaultyCuratorWatcher implements CuratorWatcher {
+            @Override
+            public void process(WatchedEvent event) throws Exception {
+                // everything will be ignored
+            }
+        }
+
+        final FaultyCuratorWatcher fautlyWatcher = new FaultyCuratorWatcher();
+
+        class FaultySharedValue extends SharedValue {
+            public FaultySharedValue(CuratorFramework client, String path, byte[] seedValue) {
+                super(client, path, seedValue, fautlyWatcher);
+            }
+        };
+
+        final SharedValue faultySharedValue = new FaultySharedValue(curatorFramework, path, SharedCount.toBytes(val));
+        class FaultySharedCount extends SharedCount {
+            public FaultySharedCount(CuratorFramework client, String path, int val) {
+                super(client, path, faultySharedValue);
+            }
+        };
+        return new FaultySharedCount(curatorFramework, path, val);
+    }
+
+
 }


[3/5] curator git commit: Make sure readValueAndNotifyListenersInBackground() is called after a connection problem

Posted by ra...@apache.org.
Make sure readValueAndNotifyListenersInBackground() is called after a connection problem


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5de6b818
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5de6b818
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5de6b818

Branch: refs/heads/master
Commit: 5de6b818a8180291a6769e8db7d14b370dfb5221
Parents: 174faef
Author: randgalt <ra...@apache.org>
Authored: Mon Jul 10 11:05:57 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jul 10 11:05:57 2017 -0500

----------------------------------------------------------------------
 .../apache/curator/framework/recipes/shared/SharedValue.java | 5 +++--
 .../curator/framework/recipes/shared/TestSharedCount.java    | 8 +++-----
 2 files changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/5de6b818/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 68fd5b5..5d7abce 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@ -79,7 +79,7 @@ public class SharedValue implements Closeable, SharedValueReader
         public void stateChanged(CuratorFramework client, ConnectionState newState)
         {
             notifyListenerOfStateChanged(newState);
-            if ( newState == ConnectionState.RECONNECTED )
+            if ( newState.isConnected() )
             {
                 try
                 {
@@ -87,6 +87,7 @@ public class SharedValue implements Closeable, SharedValueReader
                 }
                 catch ( Exception e )
                 {
+                    ThreadUtils.checkInterrupted(e);
                     log.error("Could not read value after reconnect", e);
                 }
             }
@@ -115,7 +116,7 @@ public class SharedValue implements Closeable, SharedValueReader
     }
 
     @VisibleForTesting
-    protected SharedValue(CuratorFramework client, String path, byte[] seedValue, CuratorWatcher watcher)
+    protected SharedValue(WatcherRemoveCuratorFramework client, String path, byte[] seedValue, CuratorWatcher watcher)
     {
         this.client = client;
         this.path = PathUtils.validatePath(path);

http://git-wip-us.apache.org/repos/asf/curator/blob/5de6b818/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index d7ebb6c..3123c7d 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -516,18 +516,16 @@ public class TestSharedCount extends BaseClassForTests
 
     private SharedCount createSharedCountWithFaultyWatcher(CuratorFramework curatorFramework, String path, int val) {
 
-        class FaultyCuratorWatcher implements CuratorWatcher {
+        final CuratorWatcher faultyWatcher = new CuratorWatcher() {
             @Override
             public void process(WatchedEvent event) throws Exception {
                 // everything will be ignored
             }
-        }
-
-        final FaultyCuratorWatcher fautlyWatcher = new FaultyCuratorWatcher();
+        };
 
         class FaultySharedValue extends SharedValue {
             public FaultySharedValue(CuratorFramework client, String path, byte[] seedValue) {
-                super(client, path, seedValue, fautlyWatcher);
+                super(client.newWatcherRemoveCuratorFramework(), path, seedValue, faultyWatcher);
             }
         };
 


[5/5] curator git commit: closes #200 - PR accepted and merged

Posted by ra...@apache.org.
closes #200 - PR accepted and merged


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/12cc7cec
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/12cc7cec
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/12cc7cec

Branch: refs/heads/master
Commit: 12cc7cec5dcb0cc512ec5900426fde74e3289614
Parents: 7442357
Author: randgalt <ra...@apache.org>
Authored: Mon Jul 10 12:10:46 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jul 10 12:10:46 2017 -0500

----------------------------------------------------------------------

----------------------------------------------------------------------



[4/5] curator git commit: fixed flakiness with TestSharedCount tests

Posted by ra...@apache.org.
fixed flakiness with TestSharedCount tests


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/7442357b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/7442357b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/7442357b

Branch: refs/heads/master
Commit: 7442357b7e6dc76ff76c6bc6a3d361b65815f626
Parents: 5de6b81
Author: randgalt <ra...@apache.org>
Authored: Mon Jul 10 12:00:16 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jul 10 12:00:16 2017 -0500

----------------------------------------------------------------------
 .../framework/recipes/shared/TestSharedCount.java      | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/7442357b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index 3123c7d..6a0b7c2 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -78,6 +78,7 @@ public class TestSharedCount extends BaseClassForTests
                                 CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
                                 clients.add(client);
                                 client.start();
+                                client.checkExists().forPath("/");  // clear initial connect event
 
                                 SharedCount count = new SharedCount(client, "/count", 10);
                                 counts.add(count);
@@ -121,6 +122,7 @@ public class TestSharedCount extends BaseClassForTests
             CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
             clients.add(client);
             client.start();
+            client.checkExists().forPath("/");  // clear initial connect event
 
             Assert.assertTrue(startLatch.await(10, TimeUnit.SECONDS));
 
@@ -438,13 +440,14 @@ public class TestSharedCount extends BaseClassForTests
     @Test
     public void testDisconnectReconnectWithMultipleClients() throws Exception
     {
+        Timing timing = new Timing();
         CuratorFramework curatorFramework1 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
         CuratorFramework curatorFramework2 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
 
         curatorFramework1.start();
-        curatorFramework1.blockUntilConnected();
+        curatorFramework1.checkExists().forPath("/");   // clear initial connect events
         curatorFramework2.start();
-        curatorFramework2.blockUntilConnected();
+        curatorFramework2.checkExists().forPath("/");   // clear initial connect events
 
         final String sharedCountPath = "/count";
         final int initialCount = 10;
@@ -485,7 +488,7 @@ public class TestSharedCount extends BaseClassForTests
         try
         {
             sharedCount1.setCount(12);
-            Assert.assertEquals(listener1.gotChangeEvent.awaitAdvanceInterruptibly(0, 2, TimeUnit.SECONDS), 1);
+            Assert.assertEquals(listener1.gotChangeEvent.awaitAdvanceInterruptibly(0, timing.seconds(), TimeUnit.SECONDS), 1);
             Assert.assertEquals(sharedCount1.getCount(), 12);
 
             Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 10);
@@ -498,10 +501,10 @@ public class TestSharedCount extends BaseClassForTests
 
                 server.restart();
 
-                Assert.assertEquals(listener2.getReconnectEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+                Assert.assertEquals(listener2.getReconnectEvent.awaitAdvanceInterruptibly(i, timing.forWaiting().seconds(), TimeUnit.SECONDS), i + 1);
                 // CURATOR-311 introduces to Curator's client reading server's shared count value
                 // when client's state gets ConnectionState.RECONNECTED. Following tests ensures that.
-                Assert.assertEquals(listener2.gotChangeEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+                Assert.assertEquals(listener2.gotChangeEvent.awaitAdvanceInterruptibly(i, timing.forWaiting().seconds(), TimeUnit.SECONDS), i + 1);
                 Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 13 + i);
             }
         }


[2/5] curator git commit: Merge branch 'CURATOR-311' of github.com:oza/curator into CURATOR-311

Posted by ra...@apache.org.
Merge branch 'CURATOR-311' of github.com:oza/curator into CURATOR-311


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/174faef5
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/174faef5
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/174faef5

Branch: refs/heads/master
Commit: 174faef5f0de10626c616d2a25eb9fb1e5572966
Parents: 0f5d10d 8c1c5ff
Author: randgalt <ra...@apache.org>
Authored: Mon Jul 10 10:55:00 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jul 10 10:55:00 2017 -0500

----------------------------------------------------------------------
 .../framework/recipes/shared/SharedCount.java   |   5 +
 .../framework/recipes/shared/SharedValue.java   |  27 ++++-
 .../recipes/shared/TestSharedCount.java         | 116 ++++++++++++++++++-
 3 files changed, 146 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/174faef5/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 1a3d889,7e3f26a..68fd5b5
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@@ -94,9 -106,21 +107,21 @@@ public class SharedValue implements Clo
       */
      public SharedValue(CuratorFramework client, String path, byte[] seedValue)
      {
 -        this.client = client;
 +        this.client = client.newWatcherRemoveCuratorFramework();
          this.path = PathUtils.validatePath(path);
          this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
+         this.watcher = new SharedValueCuratorWatcher();
+         currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
+     }
+ 
+     @VisibleForTesting
+     protected SharedValue(CuratorFramework client, String path, byte[] seedValue, CuratorWatcher watcher)
+     {
+         this.client = client;
+         this.path = PathUtils.validatePath(path);
+         this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
+         // inject watcher for testing
+         this.watcher = watcher;
          currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
      }
  

http://git-wip-us.apache.org/repos/asf/curator/blob/174faef5/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index 0690d6a,330c8f4..d7ebb6c
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@@ -23,9 -23,9 +23,10 @@@ import com.google.common.collect.Lists
  import com.google.common.util.concurrent.ThreadFactoryBuilder;
  import org.apache.curator.framework.CuratorFramework;
  import org.apache.curator.framework.CuratorFrameworkFactory;
 +import org.apache.curator.framework.imps.TestCleanState;
  import org.apache.curator.framework.api.BackgroundCallback;
  import org.apache.curator.framework.api.CuratorEvent;
+ import org.apache.curator.framework.api.CuratorWatcher;
  import org.apache.curator.framework.state.ConnectionState;
  import org.apache.curator.framework.state.ConnectionStateListener;
  import org.apache.curator.retry.RetryNTimes;
@@@ -424,7 -392,115 +430,115 @@@ public class TestSharedCount extends Ba
          finally
          {
              CloseableUtils.closeQuietly(sharedCount);
 -            CloseableUtils.closeQuietly(curatorFramework);
 +            TestCleanState.closeAndTestClean(curatorFramework);
          }
      }
+ 
+ 
+     @Test
+     public void testDisconnectReconnectWithMultipleClients() throws Exception
+     {
+         CuratorFramework curatorFramework1 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
+         CuratorFramework curatorFramework2 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
+ 
+         curatorFramework1.start();
+         curatorFramework1.blockUntilConnected();
+         curatorFramework2.start();
+         curatorFramework2.blockUntilConnected();
+ 
+         final String sharedCountPath = "/count";
+         final int initialCount = 10;
+         SharedCount sharedCount1 = new SharedCount(curatorFramework1, sharedCountPath, initialCount);
+         SharedCount sharedCountWithFaultyWatcher = createSharedCountWithFaultyWatcher(curatorFramework2, sharedCountPath, initialCount);
+ 
+         class MySharedCountListener implements SharedCountListener
+         {
+             final public Phaser gotSuspendEvent = new Phaser(1);
+             final public Phaser gotChangeEvent = new Phaser(1);
+             final public Phaser getReconnectEvent = new Phaser(1);
+             final public AtomicInteger numChangeEvents = new AtomicInteger(0);
+ 
+             @Override
+             public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
+             {
+                 numChangeEvents.incrementAndGet();
+                 gotChangeEvent.arrive();
+             }
+ 
+             @Override
+             public void stateChanged(CuratorFramework client, ConnectionState newState)
+             {
+                 if (newState == ConnectionState.SUSPENDED) {
+                     gotSuspendEvent.arrive();
+                 } else if (newState == ConnectionState.RECONNECTED) {
+                     getReconnectEvent.arrive();
+                 }
+             }
+         }
+ 
+         MySharedCountListener listener1 = new MySharedCountListener();
+         sharedCount1.addListener(listener1);
+         sharedCount1.start();
+         MySharedCountListener listener2 = new MySharedCountListener();
+         sharedCountWithFaultyWatcher.addListener(listener2);
+ 
+         try
+         {
+             sharedCount1.setCount(12);
+             Assert.assertEquals(listener1.gotChangeEvent.awaitAdvanceInterruptibly(0, 2, TimeUnit.SECONDS), 1);
+             Assert.assertEquals(sharedCount1.getCount(), 12);
+ 
+             Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 10);
+             // new counter with faultyWatcher start
+             sharedCountWithFaultyWatcher.start();
+ 
+             for (int i = 0; i < 10; i++) {
+                 sharedCount1.setCount(13 + i);
+                 Assert.assertEquals(sharedCount1.getCount(), 13 + i);
+ 
+                 server.restart();
+ 
+                 Assert.assertEquals(listener2.getReconnectEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+                 // CURATOR-311 introduces to Curator's client reading server's shared count value
+                 // when client's state gets ConnectionState.RECONNECTED. Following tests ensures that.
+                 Assert.assertEquals(listener2.gotChangeEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+                 Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 13 + i);
+             }
+         }
+         finally
+         {
+             CloseableUtils.closeQuietly(sharedCount1);
+             CloseableUtils.closeQuietly(curatorFramework1);
+             CloseableUtils.closeQuietly(sharedCountWithFaultyWatcher);
+             CloseableUtils.closeQuietly(curatorFramework2);
+         }
+     }
+ 
+     private SharedCount createSharedCountWithFaultyWatcher(CuratorFramework curatorFramework, String path, int val) {
+ 
+         class FaultyCuratorWatcher implements CuratorWatcher {
+             @Override
+             public void process(WatchedEvent event) throws Exception {
+                 // everything will be ignored
+             }
+         }
+ 
+         final FaultyCuratorWatcher fautlyWatcher = new FaultyCuratorWatcher();
+ 
+         class FaultySharedValue extends SharedValue {
+             public FaultySharedValue(CuratorFramework client, String path, byte[] seedValue) {
+                 super(client, path, seedValue, fautlyWatcher);
+             }
+         };
+ 
+         final SharedValue faultySharedValue = new FaultySharedValue(curatorFramework, path, SharedCount.toBytes(val));
+         class FaultySharedCount extends SharedCount {
+             public FaultySharedCount(CuratorFramework client, String path, int val) {
+                 super(client, path, faultySharedValue);
+             }
+         };
+         return new FaultySharedCount(curatorFramework, path, val);
+     }
+ 
+ 
  }