You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/07/16 21:06:20 UTC

[03/21] curator git commit: Merge branch 'CURATOR-311' of github.com:oza/curator into CURATOR-311

Merge branch 'CURATOR-311' of github.com:oza/curator into CURATOR-311


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/174faef5
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/174faef5
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/174faef5

Branch: refs/heads/CURATOR-419
Commit: 174faef5f0de10626c616d2a25eb9fb1e5572966
Parents: 0f5d10d 8c1c5ff
Author: randgalt <ra...@apache.org>
Authored: Mon Jul 10 10:55:00 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jul 10 10:55:00 2017 -0500

----------------------------------------------------------------------
 .../framework/recipes/shared/SharedCount.java   |   5 +
 .../framework/recipes/shared/SharedValue.java   |  27 ++++-
 .../recipes/shared/TestSharedCount.java         | 116 ++++++++++++++++++-
 3 files changed, 146 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/174faef5/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 1a3d889,7e3f26a..68fd5b5
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@@ -94,9 -106,21 +107,21 @@@ public class SharedValue implements Clo
       */
      public SharedValue(CuratorFramework client, String path, byte[] seedValue)
      {
 -        this.client = client;
 +        this.client = client.newWatcherRemoveCuratorFramework();
          this.path = PathUtils.validatePath(path);
          this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
+         this.watcher = new SharedValueCuratorWatcher();
+         currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
+     }
+ 
+     @VisibleForTesting
+     protected SharedValue(CuratorFramework client, String path, byte[] seedValue, CuratorWatcher watcher)
+     {
+         this.client = client;
+         this.path = PathUtils.validatePath(path);
+         this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
+         // inject watcher for testing
+         this.watcher = watcher;
          currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
      }
  

http://git-wip-us.apache.org/repos/asf/curator/blob/174faef5/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index 0690d6a,330c8f4..d7ebb6c
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@@ -23,9 -23,9 +23,10 @@@ import com.google.common.collect.Lists
  import com.google.common.util.concurrent.ThreadFactoryBuilder;
  import org.apache.curator.framework.CuratorFramework;
  import org.apache.curator.framework.CuratorFrameworkFactory;
 +import org.apache.curator.framework.imps.TestCleanState;
  import org.apache.curator.framework.api.BackgroundCallback;
  import org.apache.curator.framework.api.CuratorEvent;
+ import org.apache.curator.framework.api.CuratorWatcher;
  import org.apache.curator.framework.state.ConnectionState;
  import org.apache.curator.framework.state.ConnectionStateListener;
  import org.apache.curator.retry.RetryNTimes;
@@@ -424,7 -392,115 +430,115 @@@ public class TestSharedCount extends Ba
          finally
          {
              CloseableUtils.closeQuietly(sharedCount);
 -            CloseableUtils.closeQuietly(curatorFramework);
 +            TestCleanState.closeAndTestClean(curatorFramework);
          }
      }
+ 
+ 
+     @Test
+     public void testDisconnectReconnectWithMultipleClients() throws Exception
+     {
+         CuratorFramework curatorFramework1 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
+         CuratorFramework curatorFramework2 = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500));
+ 
+         curatorFramework1.start();
+         curatorFramework1.blockUntilConnected();
+         curatorFramework2.start();
+         curatorFramework2.blockUntilConnected();
+ 
+         final String sharedCountPath = "/count";
+         final int initialCount = 10;
+         SharedCount sharedCount1 = new SharedCount(curatorFramework1, sharedCountPath, initialCount);
+         SharedCount sharedCountWithFaultyWatcher = createSharedCountWithFaultyWatcher(curatorFramework2, sharedCountPath, initialCount);
+ 
+         class MySharedCountListener implements SharedCountListener
+         {
+             final public Phaser gotSuspendEvent = new Phaser(1);
+             final public Phaser gotChangeEvent = new Phaser(1);
+             final public Phaser getReconnectEvent = new Phaser(1);
+             final public AtomicInteger numChangeEvents = new AtomicInteger(0);
+ 
+             @Override
+             public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
+             {
+                 numChangeEvents.incrementAndGet();
+                 gotChangeEvent.arrive();
+             }
+ 
+             @Override
+             public void stateChanged(CuratorFramework client, ConnectionState newState)
+             {
+                 if (newState == ConnectionState.SUSPENDED) {
+                     gotSuspendEvent.arrive();
+                 } else if (newState == ConnectionState.RECONNECTED) {
+                     getReconnectEvent.arrive();
+                 }
+             }
+         }
+ 
+         MySharedCountListener listener1 = new MySharedCountListener();
+         sharedCount1.addListener(listener1);
+         sharedCount1.start();
+         MySharedCountListener listener2 = new MySharedCountListener();
+         sharedCountWithFaultyWatcher.addListener(listener2);
+ 
+         try
+         {
+             sharedCount1.setCount(12);
+             Assert.assertEquals(listener1.gotChangeEvent.awaitAdvanceInterruptibly(0, 2, TimeUnit.SECONDS), 1);
+             Assert.assertEquals(sharedCount1.getCount(), 12);
+ 
+             Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 10);
+             // new counter with faultyWatcher start
+             sharedCountWithFaultyWatcher.start();
+ 
+             for (int i = 0; i < 10; i++) {
+                 sharedCount1.setCount(13 + i);
+                 Assert.assertEquals(sharedCount1.getCount(), 13 + i);
+ 
+                 server.restart();
+ 
+                 Assert.assertEquals(listener2.getReconnectEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+                 // CURATOR-311 introduces to Curator's client reading server's shared count value
+                 // when client's state gets ConnectionState.RECONNECTED. Following tests ensures that.
+                 Assert.assertEquals(listener2.gotChangeEvent.awaitAdvanceInterruptibly(i, 2, TimeUnit.SECONDS), i + 1);
+                 Assert.assertEquals(sharedCountWithFaultyWatcher.getCount(), 13 + i);
+             }
+         }
+         finally
+         {
+             CloseableUtils.closeQuietly(sharedCount1);
+             CloseableUtils.closeQuietly(curatorFramework1);
+             CloseableUtils.closeQuietly(sharedCountWithFaultyWatcher);
+             CloseableUtils.closeQuietly(curatorFramework2);
+         }
+     }
+ 
+     private SharedCount createSharedCountWithFaultyWatcher(CuratorFramework curatorFramework, String path, int val) {
+ 
+         class FaultyCuratorWatcher implements CuratorWatcher {
+             @Override
+             public void process(WatchedEvent event) throws Exception {
+                 // everything will be ignored
+             }
+         }
+ 
+         final FaultyCuratorWatcher fautlyWatcher = new FaultyCuratorWatcher();
+ 
+         class FaultySharedValue extends SharedValue {
+             public FaultySharedValue(CuratorFramework client, String path, byte[] seedValue) {
+                 super(client, path, seedValue, fautlyWatcher);
+             }
+         };
+ 
+         final SharedValue faultySharedValue = new FaultySharedValue(curatorFramework, path, SharedCount.toBytes(val));
+         class FaultySharedCount extends SharedCount {
+             public FaultySharedCount(CuratorFramework client, String path, int val) {
+                 super(client, path, faultySharedValue);
+             }
+         };
+         return new FaultySharedCount(curatorFramework, path, val);
+     }
+ 
+ 
  }