You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/02/11 12:47:05 UTC
[curator] 02/02: CURATOR-505 - Some refactoring and more doc
This is an automated email from the ASF dual-hosted git repository.
randgalt pushed a commit to branch CURATOR-505
in repository https://gitbox.apache.org/repos/asf/curator.git
commit b18af513b4851c8ecb9486a957971d7dfa179ceb
Author: randgalt <ra...@apache.org>
AuthorDate: Mon Feb 11 07:46:55 2019 -0500
CURATOR-505 - Some refactoring and more doc
---
.../framework/listen/MappingListenerManager.java | 13 ---
.../framework/listen/StandardListenerManager.java | 14 ++-
.../framework/state/ConnectionStateManager.java | 3 +-
.../cache/TestPathChildrenCacheInCluster.java | 106 ++++++++++++++++++++-
.../org/apache/curator/test/BaseClassForTests.java | 5 +
5 files changed, 123 insertions(+), 18 deletions(-)
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java
index f230da9..bd9f51a 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
-import java.util.function.UnaryOperator;
/**
* Upgraded version of {@link org.apache.curator.framework.listen.ListenerContainer} that
@@ -39,18 +38,6 @@ public class MappingListenerManager<K, V> implements ListenerManager<K, V>
private final Function<K, V> mapper;
/**
- * Returns a new mapping container that maps to the same type
- *
- * @param mapper listener mapper/wrapper
- * @return new container
- */
- public static <T> StandardListenerManager<T> mappingStandard(UnaryOperator<T> mapper)
- {
- MappingListenerManager<T, T> container = new MappingListenerManager<>(mapper);
- return new StandardListenerManager<>(container);
- }
-
- /**
* Returns a new container that wraps listeners using the given mapper
*
* @param mapper listener mapper/wrapper
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
index 8d239ca..e07fe47 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
@@ -18,10 +18,10 @@
*/
package org.apache.curator.framework.listen;
-import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.function.UnaryOperator;
/**
* Non mapping version of a listener container
@@ -41,6 +41,18 @@ public class StandardListenerManager<T> implements ListenerManager<T, T>
return new StandardListenerManager<>(container);
}
+ /**
+ * Returns a new mapping container that maps to the same type
+ *
+ * @param mapper listener mapper/wrapper
+ * @return new container
+ */
+ public static <T> StandardListenerManager<T> mappingStandard(UnaryOperator<T> mapper)
+ {
+ MappingListenerManager<T, T> container = new MappingListenerManager<>(mapper);
+ return new StandardListenerManager<>(container);
+ }
+
@Override
public void addListener(T listener)
{
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 583b9f2..55e17c8 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -22,7 +22,6 @@ package org.apache.curator.framework.state;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.listen.MappingListenerManager;
import org.apache.curator.framework.listen.StandardListenerManager;
import org.apache.curator.utils.Compatibility;
import org.apache.curator.utils.ThreadUtils;
@@ -114,7 +113,7 @@ public class ConnectionStateManager implements Closeable
threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");
}
service = Executors.newSingleThreadExecutor(threadFactory);
- listeners = MappingListenerManager.mappingStandard(listener -> listener.doNotDecorate() ? listener : connectionStateListenerDecorator.decorateListener(client, listener));
+ listeners = StandardListenerManager.mappingStandard(listener -> listener.doNotDecorate() ? listener : connectionStateListenerDecorator.decorateListener(client, listener));
}
/**
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java
index cd87125..8dae57b 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java
@@ -19,23 +19,125 @@
package org.apache.curator.framework.recipes.cache;
import com.google.common.collect.Queues;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionStateListenerDecorator;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.retry.RetryForever;
import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingCluster;
import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
+import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class TestPathChildrenCacheInCluster extends BaseClassForTests
{
+ @Override
+ protected void createServer()
+ {
+ // do nothing
+ }
+
+ @Test
+ public void testWithCircuitBreaker() throws Exception
+ {
+ Timing timing = new Timing();
+ try ( TestingCluster cluster = new TestingCluster(3) )
+ {
+ cluster.start();
+
+ ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreaking(new RetryForever(timing.multiple(2).milliseconds()));
+ Iterator<InstanceSpec> iterator = cluster.getInstances().iterator();
+ InstanceSpec client1Instance = iterator.next();
+ InstanceSpec client2Instance = iterator.next();
+ ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(100, 3);
+ try (CuratorFramework client1 = CuratorFrameworkFactory.
+ builder()
+ .connectString(client1Instance.getConnectString())
+ .retryPolicy(exponentialBackoffRetry)
+ .sessionTimeoutMs(timing.session())
+ .connectionTimeoutMs(timing.connection())
+ .connectionStateListenerDecorator(decorator)
+ .build()
+ )
+ {
+ client1.start();
+
+ try ( CuratorFramework client2 = CuratorFrameworkFactory.newClient(client2Instance.getConnectString(), timing.session(), timing.connection(), exponentialBackoffRetry) )
+ {
+ client2.start();
+
+ AtomicInteger refreshCount = new AtomicInteger(0);
+ try ( PathChildrenCache cache = new PathChildrenCache(client1, "/test", true) {
+ @Override
+ void refresh(RefreshMode mode) throws Exception
+ {
+ refreshCount.incrementAndGet();
+ super.refresh(mode);
+ }
+ } )
+ {
+ cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+
+ client2.create().forPath("/test/1", "one".getBytes());
+ client2.create().forPath("/test/2", "two".getBytes());
+ client2.create().forPath("/test/3", "three".getBytes());
+
+ Future<?> task = Executors.newSingleThreadExecutor().submit(() -> {
+ try
+ {
+ for ( int i = 0; i < 5; ++i )
+ {
+ cluster.killServer(client1Instance);
+ cluster.restartServer(client1Instance);
+ timing.sleepABit();
+ }
+ }
+ catch ( Exception e )
+ {
+ e.printStackTrace();
+ }
+ });
+
+ client2.create().forPath("/test/4", "four".getBytes());
+ client2.create().forPath("/test/5", "five".getBytes());
+ client2.delete().forPath("/test/4");
+ client2.setData().forPath("/test/1", "1".getBytes());
+ client2.create().forPath("/test/6", "six".getBytes());
+
+ task.get();
+ timing.sleepABit();
+
+ Assert.assertNotNull(cache.getCurrentData("/test/1"));
+ Assert.assertEquals(cache.getCurrentData("/test/1").getData(), "1".getBytes());
+ Assert.assertNotNull(cache.getCurrentData("/test/2"));
+ Assert.assertEquals(cache.getCurrentData("/test/2").getData(), "two".getBytes());
+ Assert.assertNotNull(cache.getCurrentData("/test/3"));
+ Assert.assertEquals(cache.getCurrentData("/test/3").getData(), "three".getBytes());
+ Assert.assertNull(cache.getCurrentData("/test/4"));
+ Assert.assertNotNull(cache.getCurrentData("/test/5"));
+ Assert.assertEquals(cache.getCurrentData("/test/5").getData(), "five".getBytes());
+ Assert.assertNotNull(cache.getCurrentData("/test/6"));
+ Assert.assertEquals(cache.getCurrentData("/test/6").getData(), "six".getBytes());
+
+ Assert.assertEquals(refreshCount.get(), 2);
+ }
+ }
+ }
+ }
+ }
+
@Test(enabled = false) // this test is very flakey - it needs to be re-written at some point
public void testMissedDelete() throws Exception
{
diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
index 9ae6a5d..f932ae4 100644
--- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
+++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
@@ -98,6 +98,11 @@ public class BaseClassForTests
System.setProperty(INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND, "true");
System.setProperty(INTERNAL_PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY, "true");
+ createServer();
+ }
+
+ protected void createServer() throws Exception
+ {
while ( server == null )
{
try