You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/07/10 17:09:06 UTC

[1/2] curator git commit: Make sure readValueAndNotifyListenersInBackground() is called after a connection problem

Repository: curator
Updated Branches:
  refs/heads/CURATOR-2.0 c7df8e251 -> 6af5f367e


Make sure readValueAndNotifyListenersInBackground() is called after a connection problem


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/3e159bdd
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/3e159bdd
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/3e159bdd

Branch: refs/heads/CURATOR-2.0
Commit: 3e159bddb91c6e8b4e9e27ccbf00e06b4f35638e
Parents: c7df8e2
Author: randgalt <ra...@apache.org>
Authored: Mon Jul 10 11:05:57 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jul 10 11:21:57 2017 -0500

----------------------------------------------------------------------
 .../framework/recipes/shared/SharedValue.java   |  30 +++-
 .../recipes/shared/TestSharedCount.java         | 148 ++++++++++++++++++-
 2 files changed, 175 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/3e159bdd/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 1f9df37..5478a8f 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@ -19,6 +19,7 @@
 
 package org.apache.curator.framework.recipes.shared;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
@@ -56,8 +57,9 @@ public class SharedValue implements Closeable, SharedValueReader
     private final byte[] seedValue;
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
     private final AtomicReference<VersionedValue<byte[]>> currentValue;
+    private final CuratorWatcher watcher;
 
-    private final CuratorWatcher watcher = new CuratorWatcher()
+    private class SharedValueCuratorWatcher implements CuratorWatcher
     {
         @Override
         public void process(WatchedEvent event) throws Exception
@@ -68,7 +70,7 @@ public class SharedValue implements Closeable, SharedValueReader
                 readValueAndNotifyListenersInBackground();
             }
         }
-    };
+    }
 
     private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
     {
@@ -76,6 +78,18 @@ public class SharedValue implements Closeable, SharedValueReader
         public void stateChanged(CuratorFramework client, ConnectionState newState)
         {
             notifyListenerOfStateChanged(newState);
+            if ( newState.isConnected() )
+            {
+                try
+                {
+                    readValueAndNotifyListenersInBackground();
+                }
+                catch ( Exception e )
+                {
+                    ThreadUtils.checkInterrupted(e);
+                    log.error("Could not read value after reconnect", e);
+                }
+            }
         }
     };
 
@@ -96,6 +110,18 @@ public class SharedValue implements Closeable, SharedValueReader
         this.client = client;
         this.path = PathUtils.validatePath(path);
         this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
+        this.watcher = new SharedValueCuratorWatcher();
+        currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
+    }
+
+    @VisibleForTesting
+    protected SharedValue(CuratorFramework client, String path, byte[] seedValue, CuratorWatcher watcher)
+    {
+        this.client = client;
+        this.path = PathUtils.validatePath(path);
+        this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
+        // inject watcher for testing
+        this.watcher = watcher;
         currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/3e159bdd/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index 7939f6e..a6a32e9 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -25,6 +25,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryNTimes;
@@ -32,6 +33,7 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.WatchedEvent;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.List;
@@ -42,6 +44,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -167,10 +170,29 @@ public class TestSharedCount extends BaseClassForTests
             client.start();
             count.start();
 
+            final CountDownLatch setLatch = new CountDownLatch(3);
+            SharedCountListener listener = new SharedCountListener()
+            {
+                @Override
+                public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
+                {
+                    setLatch.countDown();
+                }
+
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    // nop
+                }
+            };
+            count.addListener(listener);
+
             Assert.assertTrue(count.trySetCount(1));
             Assert.assertTrue(count.trySetCount(2));
             Assert.assertTrue(count.trySetCount(10));
             Assert.assertEquals(count.getCount(), 10);
+
+            Assert.assertTrue(new Timing().awaitLatch(setLatch));
         }
         finally
         {
@@ -246,12 +268,30 @@ public class TestSharedCount extends BaseClassForTests
             Assert.assertTrue(count2.trySetCount(versionedValue, 20));
             timing.sleepABit();
 
+            final CountDownLatch setLatch = new CountDownLatch(2);
+            SharedCountListener listener = new SharedCountListener()
+            {
+                @Override
+                public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
+                {
+                    setLatch.countDown();
+                }
+
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    // nop
+                }
+            };
+            count1.addListener(listener);
             VersionedValue<Integer> versionedValue1 = count1.getVersionedValue();
             VersionedValue<Integer> versionedValue2 = count2.getVersionedValue();
             Assert.assertTrue(count2.trySetCount(versionedValue2, 30));
             Assert.assertFalse(count1.trySetCount(versionedValue1, 40));
+
             versionedValue1 = count1.getVersionedValue();
             Assert.assertTrue(count1.trySetCount(versionedValue1, 40));
+            Assert.assertTrue(timing.awaitLatch(setLatch));
         }
         finally
         {
@@ -368,6 +408,7 @@ public class TestSharedCount extends BaseClassForTests
 
             server.restart();
             Assert.assertTrue(getReconnectEvent.await(2, TimeUnit.SECONDS));
+            Assert.assertEquals(numChangeEvents.get(), 1);
 
             sharedCount.trySetCount(sharedCount.getVersionedValue(), 12);
 
@@ -381,7 +422,9 @@ public class TestSharedCount extends BaseClassForTests
             }).forPath("/count");
             flushDone.await(5, TimeUnit.SECONDS);
 
-            Assert.assertEquals(2, numChangeEvents.get());
+            // CURATOR-311: when a Curator client's state became RECONNECTED, countHasChanged method is called back
+            // because the Curator client calls readValueAndNotifyListenersInBackground in SharedValue#ConnectionStateListener#stateChanged.
+            Assert.assertEquals(numChangeEvents.get(), 3);
         }
         finally
         {
@@ -389,4 +432,107 @@ public class TestSharedCount extends BaseClassForTests
             CloseableUtils.closeQuietly(curatorFramework);
         }
     }
+
+    @Test
+    public void testDisconnectReconnectWithMultipleClients() throws Exception
+    {
+        CuratorFramework curatorFramework1 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
+        CuratorFramework curatorFramework2 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
+
+        curatorFramework1.start();
+        curatorFramework1.blockUntilConnected();
+        curatorFramework2.start();
+        curatorFramework2.blockUntilConnected();
+
+        final String sharedCountPath = "/count";
+        final int initialCount = 10;
+        SharedCount sharedCount1 = new SharedCount(curatorFramework1, sharedCountPath, initialCount);
+        SharedCount sharedCountWithFaultyWatcher = createSharedCountWithFaultyWatcher(curatorFramework2, sharedCountPath, initialCount);
+
+        class MySharedCountListener implements SharedCountListener
+        {
+            final public Phaser gotSuspendEvent = new Phaser(1);
+            final public Phaser gotChangeEvent = new Phaser(1);
+            final public Phaser getReconnectEvent = new Phaser(1);
+            final public AtomicInteger numChangeEvents = new AtomicInteger(0);
+
+            @Override
+            public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
+            {
+                numChangeEvents.incrementAndGet();
+                gotChangeEvent.arrive();
+            }
+
+            @Override
+            public void stateChanged(CuratorFramework client, ConnectionState newState)
+            {
+                if (newState == ConnectionState.SUSPENDED) {
+                    gotSuspendEvent.arrive();
+                } else if (newState.isConnected()) {
+                    getReconnectEvent.arrive();
+                }
+            }
+        }
+
+        MySharedCountListener listener1 = new MySharedCountListener();
+        sharedCount1.addListener(listener1);
+        sharedCount1.start();
+        MySharedCountListener listener2 = new MySharedCountListener();
+        sharedCountWithFaultyWatcher.addListener(listener2);
+
+        try
+        {
+            sharedCount1.setCount(12);
+            Assert.assertEquals(listener1.gotChangeEvent.awaitAdvanceInterruptibly(0, 2, TimeUnit.SECONDS), 1);
+            Assert.assertEquals(sharedCount1.getCount(), 12);
+
+            Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 10);
+            // new counter with faultyWatcher start
+            sharedCountWithFaultyWatcher.start();
+
+            for (int i = 0; i < 10; i++) {
+                sharedCount1.setCount(13 + i);
+                Assert.assertEquals(sharedCount1.getCount(), 13 + i);
+
+                server.restart();
+
+                Assert.assertEquals(listener2.getReconnectEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+                // CURATOR-311 introduces to Curator's client reading server's shared count value
+                // when client's state gets ConnectionState.RECONNECTED. Following tests ensures that.
+                Assert.assertEquals(listener2.gotChangeEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+                Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 13 + i);
+            }
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(sharedCount1);
+            CloseableUtils.closeQuietly(curatorFramework1);
+            CloseableUtils.closeQuietly(sharedCountWithFaultyWatcher);
+            CloseableUtils.closeQuietly(curatorFramework2);
+        }
+    }
+
+    private SharedCount createSharedCountWithFaultyWatcher(CuratorFramework curatorFramework, String path, int val) {
+
+        final CuratorWatcher faultyWatcher = new CuratorWatcher() {
+            @Override
+            public void process(WatchedEvent event) throws Exception {
+                // everything will be ignored
+            }
+        };
+
+        class FaultySharedValue extends SharedValue {
+            public FaultySharedValue(CuratorFramework client, String path, byte[] seedValue) {
+                super(client, path, seedValue, faultyWatcher);
+            }
+        };
+
+        final SharedValue faultySharedValue = new FaultySharedValue(curatorFramework, path, SharedCount.toBytes(val));
+        class FaultySharedCount extends SharedCount {
+            public FaultySharedCount(CuratorFramework client, String path, int val) {
+                super(client, path, val);
+            }
+        };
+        return new FaultySharedCount(curatorFramework, path, val);
+    }
 }


[2/2] curator git commit: fixed some merge problems from CURATOR-311 merge

Posted by ra...@apache.org.
fixed some merge problems from CURATOR-311 merge


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/6af5f367
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/6af5f367
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/6af5f367

Branch: refs/heads/CURATOR-2.0
Commit: 6af5f367efd238c7e61f0b9ea341d69c7fdaa736
Parents: 3e159bd
Author: randgalt <ra...@apache.org>
Authored: Mon Jul 10 12:09:01 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jul 10 12:09:01 2017 -0500

----------------------------------------------------------------------
 .../framework/recipes/shared/SharedCount.java      |  5 +++++
 .../framework/recipes/shared/TestSharedCount.java  | 17 ++++++++++-------
 2 files changed, 15 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/6af5f367/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
index 87fffdd..bdfa844 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
@@ -49,6 +49,11 @@ public class SharedCount implements Closeable, SharedCountReader, Listenable<Sha
         sharedValue = new SharedValue(client, path, toBytes(seedValue));
     }
 
+    protected SharedCount(CuratorFramework client, String path, SharedValue sv)
+    {
+        sharedValue = sv;
+    }
+
     @Override
     public int getCount()
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/6af5f367/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index a6a32e9..2ef93eb 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -77,6 +77,7 @@ public class TestSharedCount extends BaseClassForTests
                                 CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
                                 clients.add(client);
                                 client.start();
+                                client.checkExists().forPath("/");  // clear initial connect event
 
                                 SharedCount count = new SharedCount(client, "/count", 10);
                                 counts.add(count);
@@ -120,6 +121,7 @@ public class TestSharedCount extends BaseClassForTests
             CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
             clients.add(client);
             client.start();
+            client.checkExists().forPath("/");  // clear initial connect event
 
             Assert.assertTrue(startLatch.await(10, TimeUnit.SECONDS));
 
@@ -436,13 +438,14 @@ public class TestSharedCount extends BaseClassForTests
     @Test
     public void testDisconnectReconnectWithMultipleClients() throws Exception
     {
+        Timing timing = new Timing();
         CuratorFramework curatorFramework1 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
         CuratorFramework curatorFramework2 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
 
         curatorFramework1.start();
-        curatorFramework1.blockUntilConnected();
+        curatorFramework1.checkExists().forPath("/");   // clear initial connect events
         curatorFramework2.start();
-        curatorFramework2.blockUntilConnected();
+        curatorFramework2.checkExists().forPath("/");   // clear initial connect events
 
         final String sharedCountPath = "/count";
         final int initialCount = 10;
@@ -468,7 +471,7 @@ public class TestSharedCount extends BaseClassForTests
             {
                 if (newState == ConnectionState.SUSPENDED) {
                     gotSuspendEvent.arrive();
-                } else if (newState.isConnected()) {
+                } else if (newState == ConnectionState.RECONNECTED) {
                     getReconnectEvent.arrive();
                 }
             }
@@ -483,7 +486,7 @@ public class TestSharedCount extends BaseClassForTests
         try
         {
             sharedCount1.setCount(12);
-            Assert.assertEquals(listener1.gotChangeEvent.awaitAdvanceInterruptibly(0, 2, TimeUnit.SECONDS), 1);
+            Assert.assertEquals(listener1.gotChangeEvent.awaitAdvanceInterruptibly(0, timing.forWaiting().seconds(), TimeUnit.SECONDS), 1);
             Assert.assertEquals(sharedCount1.getCount(), 12);
 
             Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 10);
@@ -496,10 +499,10 @@ public class TestSharedCount extends BaseClassForTests
 
                 server.restart();
 
-                Assert.assertEquals(listener2.getReconnectEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+                Assert.assertEquals(listener2.getReconnectEvent.awaitAdvanceInterruptibly(i, timing.forWaiting().seconds(), TimeUnit.SECONDS), i + 1);
                 // CURATOR-311 introduces to Curator's client reading server's shared count value
                 // when client's state gets ConnectionState.RECONNECTED. Following tests ensures that.
-                Assert.assertEquals(listener2.gotChangeEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+                Assert.assertEquals(listener2.gotChangeEvent.awaitAdvanceInterruptibly(i, timing.forWaiting().seconds(), TimeUnit.SECONDS), i + 1);
                 Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 13 + i);
             }
         }
@@ -530,7 +533,7 @@ public class TestSharedCount extends BaseClassForTests
         final SharedValue faultySharedValue = new FaultySharedValue(curatorFramework, path, SharedCount.toBytes(val));
         class FaultySharedCount extends SharedCount {
             public FaultySharedCount(CuratorFramework client, String path, int val) {
-                super(client, path, val);
+                super(client, path, faultySharedValue);
             }
         };
         return new FaultySharedCount(curatorFramework, path, val);