You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/02/11 12:47:03 UTC

[curator] branch CURATOR-505 updated (c9e3cf6 -> b18af51)

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a change to branch CURATOR-505
in repository https://gitbox.apache.org/repos/asf/curator.git.


    from c9e3cf6  CURATOR-505 - refactoring/refining a new listener container that doesn't rely on Guava and supports mapping. We need for this PR anyway.
     new 7458a20  CURATOR-505 ctor should be private
     new b18af51  CURATOR-505 - Some refactoring and more doc

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../framework/listen/MappingListenerManager.java   |  13 ---
 .../framework/listen/StandardListenerManager.java  |  18 +++-
 .../framework/state/ConnectionStateManager.java    |   3 +-
 .../cache/TestPathChildrenCacheInCluster.java      | 106 ++++++++++++++++++++-
 .../org/apache/curator/test/BaseClassForTests.java |   5 +
 5 files changed, 125 insertions(+), 20 deletions(-)


[curator] 02/02: CURATOR-505 - Some refactoring and more doc

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-505
in repository https://gitbox.apache.org/repos/asf/curator.git

commit b18af513b4851c8ecb9486a957971d7dfa179ceb
Author: randgalt <ra...@apache.org>
AuthorDate: Mon Feb 11 07:46:55 2019 -0500

    CURATOR-505 - Some refactoring and more doc
---
 .../framework/listen/MappingListenerManager.java   |  13 ---
 .../framework/listen/StandardListenerManager.java  |  14 ++-
 .../framework/state/ConnectionStateManager.java    |   3 +-
 .../cache/TestPathChildrenCacheInCluster.java      | 106 ++++++++++++++++++++-
 .../org/apache/curator/test/BaseClassForTests.java |   5 +
 5 files changed, 123 insertions(+), 18 deletions(-)

diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java
index f230da9..bd9f51a 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import java.util.function.UnaryOperator;
 
 /**
  * Upgraded version of {@link org.apache.curator.framework.listen.ListenerContainer} that
@@ -39,18 +38,6 @@ public class MappingListenerManager<K, V> implements ListenerManager<K, V>
     private final Function<K, V> mapper;
 
     /**
-     * Returns a new mapping container that maps to the same type
-     *
-     * @param mapper listener mapper/wrapper
-     * @return new container
-     */
-    public static <T> StandardListenerManager<T> mappingStandard(UnaryOperator<T> mapper)
-    {
-        MappingListenerManager<T, T> container = new MappingListenerManager<>(mapper);
-        return new StandardListenerManager<>(container);
-    }
-
-    /**
      * Returns a new container that wraps listeners using the given mapper
      *
      * @param mapper listener mapper/wrapper
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
index 8d239ca..e07fe47 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
@@ -18,10 +18,10 @@
  */
 package org.apache.curator.framework.listen;
 
-import java.util.Objects;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.UnaryOperator;
 
 /**
  * Non mapping version of a listener container
@@ -41,6 +41,18 @@ public class StandardListenerManager<T> implements ListenerManager<T, T>
         return new StandardListenerManager<>(container);
     }
 
+    /**
+     * Returns a new mapping container that maps to the same type
+     *
+     * @param mapper listener mapper/wrapper
+     * @return new container
+     */
+    public static <T> StandardListenerManager<T> mappingStandard(UnaryOperator<T> mapper)
+    {
+        MappingListenerManager<T, T> container = new MappingListenerManager<>(mapper);
+        return new StandardListenerManager<>(container);
+    }
+
     @Override
     public void addListener(T listener)
     {
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 583b9f2..55e17c8 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -22,7 +22,6 @@ package org.apache.curator.framework.state;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.listen.MappingListenerManager;
 import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ThreadUtils;
@@ -114,7 +113,7 @@ public class ConnectionStateManager implements Closeable
             threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");
         }
         service = Executors.newSingleThreadExecutor(threadFactory);
-        listeners = MappingListenerManager.mappingStandard(listener -> listener.doNotDecorate() ? listener : connectionStateListenerDecorator.decorateListener(client, listener));
+        listeners = StandardListenerManager.mappingStandard(listener -> listener.doNotDecorate() ? listener : connectionStateListenerDecorator.decorateListener(client, listener));
     }
 
     /**
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java
index cd87125..8dae57b 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java
@@ -19,23 +19,125 @@
 package org.apache.curator.framework.recipes.cache;
 
 import com.google.common.collect.Queues;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionStateListenerDecorator;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.retry.RetryForever;
 import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.InstanceSpec;
 import org.apache.curator.test.TestingCluster;
 import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
+import java.util.Iterator;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class TestPathChildrenCacheInCluster extends BaseClassForTests
 {
+    @Override
+    protected void createServer()
+    {
+        // do nothing
+    }
+
+    @Test
+    public void testWithCircuitBreaker() throws Exception
+    {
+        Timing timing = new Timing();
+        try ( TestingCluster cluster = new TestingCluster(3) )
+        {
+            cluster.start();
+
+            ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreaking(new RetryForever(timing.multiple(2).milliseconds()));
+            Iterator<InstanceSpec> iterator = cluster.getInstances().iterator();
+            InstanceSpec client1Instance = iterator.next();
+            InstanceSpec client2Instance = iterator.next();
+            ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(100, 3);
+            try (CuratorFramework client1 = CuratorFrameworkFactory.
+                builder()
+                .connectString(client1Instance.getConnectString())
+                .retryPolicy(exponentialBackoffRetry)
+                .sessionTimeoutMs(timing.session())
+                .connectionTimeoutMs(timing.connection())
+                .connectionStateListenerDecorator(decorator)
+                .build()
+            )
+            {
+                client1.start();
+
+                try ( CuratorFramework client2 = CuratorFrameworkFactory.newClient(client2Instance.getConnectString(), timing.session(), timing.connection(), exponentialBackoffRetry) )
+                {
+                    client2.start();
+
+                    AtomicInteger refreshCount = new AtomicInteger(0);
+                    try ( PathChildrenCache cache = new PathChildrenCache(client1, "/test", true) {
+                        @Override
+                        void refresh(RefreshMode mode) throws Exception
+                        {
+                            refreshCount.incrementAndGet();
+                            super.refresh(mode);
+                        }
+                    } )
+                    {
+                        cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+
+                        client2.create().forPath("/test/1", "one".getBytes());
+                        client2.create().forPath("/test/2", "two".getBytes());
+                        client2.create().forPath("/test/3", "three".getBytes());
+
+                        Future<?> task = Executors.newSingleThreadExecutor().submit(() -> {
+                            try
+                            {
+                                for ( int i = 0; i < 5; ++i )
+                                {
+                                    cluster.killServer(client1Instance);
+                                    cluster.restartServer(client1Instance);
+                                    timing.sleepABit();
+                                }
+                            }
+                            catch ( Exception e )
+                            {
+                                e.printStackTrace();
+                            }
+                        });
+
+                        client2.create().forPath("/test/4", "four".getBytes());
+                        client2.create().forPath("/test/5", "five".getBytes());
+                        client2.delete().forPath("/test/4");
+                        client2.setData().forPath("/test/1", "1".getBytes());
+                        client2.create().forPath("/test/6", "six".getBytes());
+
+                        task.get();
+                        timing.sleepABit();
+
+                        Assert.assertNotNull(cache.getCurrentData("/test/1"));
+                        Assert.assertEquals(cache.getCurrentData("/test/1").getData(), "1".getBytes());
+                        Assert.assertNotNull(cache.getCurrentData("/test/2"));
+                        Assert.assertEquals(cache.getCurrentData("/test/2").getData(), "two".getBytes());
+                        Assert.assertNotNull(cache.getCurrentData("/test/3"));
+                        Assert.assertEquals(cache.getCurrentData("/test/3").getData(), "three".getBytes());
+                        Assert.assertNull(cache.getCurrentData("/test/4"));
+                        Assert.assertNotNull(cache.getCurrentData("/test/5"));
+                        Assert.assertEquals(cache.getCurrentData("/test/5").getData(), "five".getBytes());
+                        Assert.assertNotNull(cache.getCurrentData("/test/6"));
+                        Assert.assertEquals(cache.getCurrentData("/test/6").getData(), "six".getBytes());
+
+                        Assert.assertEquals(refreshCount.get(), 2);
+                    }
+                }
+            }
+        }
+    }
+
     @Test(enabled = false)  // this test is very flakey - it needs to be re-written at some point
     public void testMissedDelete() throws Exception
     {
diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
index 9ae6a5d..f932ae4 100644
--- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
+++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
@@ -98,6 +98,11 @@ public class BaseClassForTests
         System.setProperty(INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND, "true");
         System.setProperty(INTERNAL_PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY, "true");
 
+        createServer();
+    }
+
+    protected void createServer() throws Exception
+    {
         while ( server == null )
         {
             try


[curator] 01/02: CURATOR-505 ctor should be private

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-505
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 7458a20e8320396b33a8389e6bba04d503d23430
Author: randgalt <ra...@apache.org>
AuthorDate: Thu Feb 7 15:46:02 2019 -0500

    CURATOR-505 ctor should be private
---
 .../curator/framework/listen/StandardListenerManager.java      | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
index 8b60ac1..8d239ca 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
@@ -41,11 +41,6 @@ public class StandardListenerManager<T> implements ListenerManager<T, T>
         return new StandardListenerManager<>(container);
     }
 
-    public StandardListenerManager(ListenerManager<T, T> container)
-    {
-        this.container = Objects.requireNonNull(container, "container cannot be null");
-    }
-
     @Override
     public void addListener(T listener)
     {
@@ -81,4 +76,9 @@ public class StandardListenerManager<T> implements ListenerManager<T, T>
     {
         container.forEach(function);
     }
+
+    private StandardListenerManager(ListenerManager<T, T> container)
+    {
+        this.container = container;
+    }
 }