You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ca...@apache.org on 2016/08/31 01:01:52 UTC

[1/3] curator git commit: CURATOR-340 - Updated Zookeeper version to 3.4.8

Repository: curator
Updated Branches:
  refs/heads/CURATOR-3.0 9612c5fbf -> 9e400bc86


CURATOR-340 - Updated Zookeeper version to 3.4.8


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ef33ccb1
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ef33ccb1
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ef33ccb1

Branch: refs/heads/CURATOR-3.0
Commit: ef33ccb11a2947d6e6598714d79acad6259df454
Parents: 6cebfc1
Author: Cam McKenzie <ca...@apache.org>
Authored: Mon Aug 8 13:42:46 2016 +1000
Committer: Cam McKenzie <ca...@apache.org>
Committed: Mon Aug 8 13:42:46 2016 +1000

----------------------------------------------------------------------
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/ef33ccb1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 50b7ebd..7e8cf9f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -61,7 +61,7 @@
         <surefire-forkcount>1</surefire-forkcount>
 
         <!-- versions -->
-        <zookeeper-version>3.4.6</zookeeper-version>
+        <zookeeper-version>3.4.8</zookeeper-version>
         <maven-project-info-reports-plugin-version>2.7</maven-project-info-reports-plugin-version>
         <maven-bundle-plugin-version>2.3.7</maven-bundle-plugin-version>
         <maven-javadoc-plugin-version>2.10.3</maven-javadoc-plugin-version>


[3/3] curator git commit: Merge branch 'master' into CURATOR-3.0

Posted by ca...@apache.org.
Merge branch 'master' into CURATOR-3.0

Conflicts:
	curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
	curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
	pom.xml


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9e400bc8
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9e400bc8
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9e400bc8

Branch: refs/heads/CURATOR-3.0
Commit: 9e400bc8620430c245db86d08bafafe2ecb8534f
Parents: 9612c5f 022de39
Author: Cam McKenzie <ca...@apache.org>
Authored: Wed Aug 31 11:01:23 2016 +1000
Committer: Cam McKenzie <ca...@apache.org>
Committed: Wed Aug 31 11:01:23 2016 +1000

----------------------------------------------------------------------
 .../framework/recipes/shared/SharedValue.java   |  24 ++++-
 .../recipes/shared/TestSharedCount.java         | 106 +++++++++++++++++++
 2 files changed, 127 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/9e400bc8/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 7c2febd,1f9df37..1a3d889
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@@ -22,7 -22,8 +22,9 @@@ package org.apache.curator.framework.re
  import com.google.common.base.Function;
  import com.google.common.base.Preconditions;
  import org.apache.curator.framework.CuratorFramework;
 +import org.apache.curator.framework.WatcherRemoveCuratorFramework;
+ import org.apache.curator.framework.api.BackgroundCallback;
+ import org.apache.curator.framework.api.CuratorEvent;
  import org.apache.curator.framework.api.CuratorWatcher;
  import org.apache.curator.framework.listen.ListenerContainer;
  import org.apache.curator.framework.state.ConnectionState;

http://git-wip-us.apache.org/repos/asf/curator/blob/9e400bc8/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index 28df3f9,7939f6e..0690d6a
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@@ -23,8 -23,11 +23,12 @@@ import com.google.common.collect.Lists
  import com.google.common.util.concurrent.ThreadFactoryBuilder;
  import org.apache.curator.framework.CuratorFramework;
  import org.apache.curator.framework.CuratorFrameworkFactory;
 +import org.apache.curator.framework.imps.TestCleanState;
+ import org.apache.curator.framework.api.BackgroundCallback;
+ import org.apache.curator.framework.api.CuratorEvent;
  import org.apache.curator.framework.state.ConnectionState;
+ import org.apache.curator.framework.state.ConnectionStateListener;
+ import org.apache.curator.retry.RetryNTimes;
  import org.apache.curator.retry.RetryOneTime;
  import org.apache.curator.test.BaseClassForTests;
  import org.apache.curator.test.Timing;
@@@ -321,4 -288,105 +326,105 @@@ public class TestSharedCount extends Ba
              CloseableUtils.closeQuietly(client1);
          }
      }
+ 
+ 
+     @Test
+     public void testDisconnectEventOnWatcherDoesNotRetry() throws Exception
+     {
+         final CountDownLatch gotSuspendEvent = new CountDownLatch(1);
+ 
+         CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 1000));
+         curatorFramework.start();
+         curatorFramework.blockUntilConnected();
+ 
+         SharedCount sharedCount = new SharedCount(curatorFramework, "/count", 10);
+         sharedCount.start();
+ 
+         curatorFramework.getConnectionStateListenable().addListener(new ConnectionStateListener() {
+             @Override
+             public void stateChanged(CuratorFramework client, ConnectionState newState) {
+                 if (newState == ConnectionState.SUSPENDED) {
+                     gotSuspendEvent.countDown();
+                 }
+             }
+         });
+ 
+         try
+         {
+             server.stop();
+             // if watcher goes into 10second retry loop we won't get timely notification
+             Assert.assertTrue(gotSuspendEvent.await(5, TimeUnit.SECONDS));
+         }
+         finally
+         {
+             CloseableUtils.closeQuietly(sharedCount);
 -            CloseableUtils.closeQuietly(curatorFramework);
++            TestCleanState.closeAndTestClean(curatorFramework);
+         }
+     }
+ 
+     @Test
+     public void testDisconnectReconnectEventDoesNotFireValueWatcher() throws Exception
+     {
+         final CountDownLatch gotSuspendEvent = new CountDownLatch(1);
+         final CountDownLatch gotChangeEvent = new CountDownLatch(1);
+         final CountDownLatch getReconnectEvent = new CountDownLatch(1);
+ 
+         final AtomicInteger numChangeEvents = new AtomicInteger(0);
+ 
+ 
+         CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
+         curatorFramework.start();
+         curatorFramework.blockUntilConnected();
+ 
+         SharedCount sharedCount = new SharedCount(curatorFramework, "/count", 10);
+ 
+         sharedCount.addListener(new SharedCountListener() {
+             @Override
+             public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
+                 numChangeEvents.incrementAndGet();
+                 gotChangeEvent.countDown();
+             }
+ 
+             @Override
+             public void stateChanged(CuratorFramework client, ConnectionState newState) {
+                 if (newState == ConnectionState.SUSPENDED) {
+                     gotSuspendEvent.countDown();
+                 } else if (newState == ConnectionState.RECONNECTED) {
+                     getReconnectEvent.countDown();
+                 }
+             }
+         });
+         sharedCount.start();
+ 
+         try
+         {
+             sharedCount.setCount(11);
+             Assert.assertTrue(gotChangeEvent.await(2, TimeUnit.SECONDS));
+ 
+             server.stop();
+             Assert.assertTrue(gotSuspendEvent.await(2, TimeUnit.SECONDS));
+ 
+             server.restart();
+             Assert.assertTrue(getReconnectEvent.await(2, TimeUnit.SECONDS));
+ 
+             sharedCount.trySetCount(sharedCount.getVersionedValue(), 12);
+ 
+             // flush background task queue
+             final CountDownLatch flushDone = new CountDownLatch(1);
+             curatorFramework.getData().inBackground(new BackgroundCallback() {
+                 @Override
+                 public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+                     flushDone.countDown();
+                 }
+             }).forPath("/count");
+             flushDone.await(5, TimeUnit.SECONDS);
+ 
+             Assert.assertEquals(2, numChangeEvents.get());
+         }
+         finally
+         {
+             CloseableUtils.closeQuietly(sharedCount);
 -            CloseableUtils.closeQuietly(curatorFramework);
++            TestCleanState.closeAndTestClean(curatorFramework);
+         }
+     }
  }


[2/3] curator git commit: CURATOR-344 - do watch event processing tasks in the background and limit shared value watcher to valid change events to avoid work on disconnect

Posted by ca...@apache.org.
CURATOR-344 - do watch event processing tasks in the background and limit shared value watcher to valid change events to avoid work on disconnect


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/022de392
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/022de392
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/022de392

Branch: refs/heads/CURATOR-3.0
Commit: 022de3921a120c6f86cc6e21442327cc04b66cd2
Parents: ef33ccb
Author: gtully <ga...@gmail.com>
Authored: Thu Aug 18 19:34:10 2016 +0100
Committer: gtully <ga...@gmail.com>
Committed: Tue Aug 30 13:09:56 2016 +0100

----------------------------------------------------------------------
 .../framework/recipes/shared/SharedValue.java   |  24 ++++-
 .../recipes/shared/TestSharedCount.java         | 106 +++++++++++++++++++
 2 files changed, 127 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/022de392/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index dddc471..1f9df37 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@ -22,6 +22,8 @@ package org.apache.curator.framework.recipes.shared;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.framework.state.ConnectionState;
@@ -30,6 +32,7 @@ import org.apache.curator.utils.PathUtils;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,10 +62,10 @@ public class SharedValue implements Closeable, SharedValueReader
         @Override
         public void process(WatchedEvent event) throws Exception
         {
-            if ( state.get() == State.STARTED )
+            if ( state.get() == State.STARTED && event.getType() != Watcher.Event.EventType.None )
             {
-                readValue();
-                notifyListeners();
+                // don't block event thread in possible retry
+                readValueAndNotifyListenersInBackground();
             }
         }
     };
@@ -248,6 +251,21 @@ public class SharedValue implements Closeable, SharedValueReader
         updateValue(localStat.getVersion(), bytes);
     }
 
+    private final BackgroundCallback upadateAndNotifyListenerCallback = new BackgroundCallback() {
+        @Override
+        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+            if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
+                updateValue(event.getStat().getVersion(), event.getData());
+                notifyListeners();
+            }
+        }
+    };
+
+    private void readValueAndNotifyListenersInBackground() throws Exception
+    {
+        client.getData().usingWatcher(watcher).inBackground(upadateAndNotifyListenerCallback).forPath(path);
+    }
+
     private void notifyListeners()
     {
         final byte[] localValue = getValue();

http://git-wip-us.apache.org/repos/asf/curator/blob/022de392/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index a1f4d8c..7939f6e 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -23,7 +23,11 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryNTimes;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
@@ -40,6 +44,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class TestSharedCount extends BaseClassForTests
 {
@@ -283,4 +288,105 @@ public class TestSharedCount extends BaseClassForTests
             CloseableUtils.closeQuietly(client1);
         }
     }
+
+
+    @Test
+    public void testDisconnectEventOnWatcherDoesNotRetry() throws Exception
+    {
+        final CountDownLatch gotSuspendEvent = new CountDownLatch(1);
+
+        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 1000));
+        curatorFramework.start();
+        curatorFramework.blockUntilConnected();
+
+        SharedCount sharedCount = new SharedCount(curatorFramework, "/count", 10);
+        sharedCount.start();
+
+        curatorFramework.getConnectionStateListenable().addListener(new ConnectionStateListener() {
+            @Override
+            public void stateChanged(CuratorFramework client, ConnectionState newState) {
+                if (newState == ConnectionState.SUSPENDED) {
+                    gotSuspendEvent.countDown();
+                }
+            }
+        });
+
+        try
+        {
+            server.stop();
+            // if watcher goes into 10second retry loop we won't get timely notification
+            Assert.assertTrue(gotSuspendEvent.await(5, TimeUnit.SECONDS));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(sharedCount);
+            CloseableUtils.closeQuietly(curatorFramework);
+        }
+    }
+
+    @Test
+    public void testDisconnectReconnectEventDoesNotFireValueWatcher() throws Exception
+    {
+        final CountDownLatch gotSuspendEvent = new CountDownLatch(1);
+        final CountDownLatch gotChangeEvent = new CountDownLatch(1);
+        final CountDownLatch getReconnectEvent = new CountDownLatch(1);
+
+        final AtomicInteger numChangeEvents = new AtomicInteger(0);
+
+
+        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
+        curatorFramework.start();
+        curatorFramework.blockUntilConnected();
+
+        SharedCount sharedCount = new SharedCount(curatorFramework, "/count", 10);
+
+        sharedCount.addListener(new SharedCountListener() {
+            @Override
+            public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
+                numChangeEvents.incrementAndGet();
+                gotChangeEvent.countDown();
+            }
+
+            @Override
+            public void stateChanged(CuratorFramework client, ConnectionState newState) {
+                if (newState == ConnectionState.SUSPENDED) {
+                    gotSuspendEvent.countDown();
+                } else if (newState == ConnectionState.RECONNECTED) {
+                    getReconnectEvent.countDown();
+                }
+            }
+        });
+        sharedCount.start();
+
+        try
+        {
+            sharedCount.setCount(11);
+            Assert.assertTrue(gotChangeEvent.await(2, TimeUnit.SECONDS));
+
+            server.stop();
+            Assert.assertTrue(gotSuspendEvent.await(2, TimeUnit.SECONDS));
+
+            server.restart();
+            Assert.assertTrue(getReconnectEvent.await(2, TimeUnit.SECONDS));
+
+            sharedCount.trySetCount(sharedCount.getVersionedValue(), 12);
+
+            // flush background task queue
+            final CountDownLatch flushDone = new CountDownLatch(1);
+            curatorFramework.getData().inBackground(new BackgroundCallback() {
+                @Override
+                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+                    flushDone.countDown();
+                }
+            }).forPath("/count");
+            flushDone.await(5, TimeUnit.SECONDS);
+
+            Assert.assertEquals(2, numChangeEvents.get());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(sharedCount);
+            CloseableUtils.closeQuietly(curatorFramework);
+        }
+    }
 }