You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/02/11 12:47:03 UTC
[curator] branch CURATOR-505 updated (c9e3cf6 -> b18af51)
This is an automated email from the ASF dual-hosted git repository.
randgalt pushed a change to branch CURATOR-505
in repository https://gitbox.apache.org/repos/asf/curator.git.
from c9e3cf6 CURATOR-505 - refactoring/refining a new listener container that doesn't rely on Guava and supports mapping. We need for this PR anyway.
new 7458a20 CURATOR-505 ctor should be private
new b18af51 CURATOR-505 - Some refactoring and more doc
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../framework/listen/MappingListenerManager.java | 13 ---
.../framework/listen/StandardListenerManager.java | 18 +++-
.../framework/state/ConnectionStateManager.java | 3 +-
.../cache/TestPathChildrenCacheInCluster.java | 106 ++++++++++++++++++++-
.../org/apache/curator/test/BaseClassForTests.java | 5 +
5 files changed, 125 insertions(+), 20 deletions(-)
[curator] 02/02: CURATOR-505 - Some refactoring and more doc
Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
randgalt pushed a commit to branch CURATOR-505
in repository https://gitbox.apache.org/repos/asf/curator.git
commit b18af513b4851c8ecb9486a957971d7dfa179ceb
Author: randgalt <ra...@apache.org>
AuthorDate: Mon Feb 11 07:46:55 2019 -0500
CURATOR-505 - Some refactoring and more doc
---
.../framework/listen/MappingListenerManager.java | 13 ---
.../framework/listen/StandardListenerManager.java | 14 ++-
.../framework/state/ConnectionStateManager.java | 3 +-
.../cache/TestPathChildrenCacheInCluster.java | 106 ++++++++++++++++++++-
.../org/apache/curator/test/BaseClassForTests.java | 5 +
5 files changed, 123 insertions(+), 18 deletions(-)
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java
index f230da9..bd9f51a 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
-import java.util.function.UnaryOperator;
/**
* Upgraded version of {@link org.apache.curator.framework.listen.ListenerContainer} that
@@ -39,18 +38,6 @@ public class MappingListenerManager<K, V> implements ListenerManager<K, V>
private final Function<K, V> mapper;
/**
- * Returns a new mapping container that maps to the same type
- *
- * @param mapper listener mapper/wrapper
- * @return new container
- */
- public static <T> StandardListenerManager<T> mappingStandard(UnaryOperator<T> mapper)
- {
- MappingListenerManager<T, T> container = new MappingListenerManager<>(mapper);
- return new StandardListenerManager<>(container);
- }
-
- /**
* Returns a new container that wraps listeners using the given mapper
*
* @param mapper listener mapper/wrapper
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
index 8d239ca..e07fe47 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
@@ -18,10 +18,10 @@
*/
package org.apache.curator.framework.listen;
-import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.function.UnaryOperator;
/**
* Non mapping version of a listener container
@@ -41,6 +41,18 @@ public class StandardListenerManager<T> implements ListenerManager<T, T>
return new StandardListenerManager<>(container);
}
+ /**
+ * Returns a new mapping container that maps to the same type
+ *
+ * @param mapper listener mapper/wrapper
+ * @return new container
+ */
+ public static <T> StandardListenerManager<T> mappingStandard(UnaryOperator<T> mapper)
+ {
+ MappingListenerManager<T, T> container = new MappingListenerManager<>(mapper);
+ return new StandardListenerManager<>(container);
+ }
+
@Override
public void addListener(T listener)
{
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 583b9f2..55e17c8 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -22,7 +22,6 @@ package org.apache.curator.framework.state;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.listen.MappingListenerManager;
import org.apache.curator.framework.listen.StandardListenerManager;
import org.apache.curator.utils.Compatibility;
import org.apache.curator.utils.ThreadUtils;
@@ -114,7 +113,7 @@ public class ConnectionStateManager implements Closeable
threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");
}
service = Executors.newSingleThreadExecutor(threadFactory);
- listeners = MappingListenerManager.mappingStandard(listener -> listener.doNotDecorate() ? listener : connectionStateListenerDecorator.decorateListener(client, listener));
+ listeners = StandardListenerManager.mappingStandard(listener -> listener.doNotDecorate() ? listener : connectionStateListenerDecorator.decorateListener(client, listener));
}
/**
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java
index cd87125..8dae57b 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java
@@ -19,23 +19,125 @@
package org.apache.curator.framework.recipes.cache;
import com.google.common.collect.Queues;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionStateListenerDecorator;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.retry.RetryForever;
import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingCluster;
import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
+import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class TestPathChildrenCacheInCluster extends BaseClassForTests
{
+ @Override
+ protected void createServer()
+ {
+ // do nothing
+ }
+
+ @Test
+ public void testWithCircuitBreaker() throws Exception
+ {
+ Timing timing = new Timing();
+ try ( TestingCluster cluster = new TestingCluster(3) )
+ {
+ cluster.start();
+
+ ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreaking(new RetryForever(timing.multiple(2).milliseconds()));
+ Iterator<InstanceSpec> iterator = cluster.getInstances().iterator();
+ InstanceSpec client1Instance = iterator.next();
+ InstanceSpec client2Instance = iterator.next();
+ ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(100, 3);
+ try (CuratorFramework client1 = CuratorFrameworkFactory.
+ builder()
+ .connectString(client1Instance.getConnectString())
+ .retryPolicy(exponentialBackoffRetry)
+ .sessionTimeoutMs(timing.session())
+ .connectionTimeoutMs(timing.connection())
+ .connectionStateListenerDecorator(decorator)
+ .build()
+ )
+ {
+ client1.start();
+
+ try ( CuratorFramework client2 = CuratorFrameworkFactory.newClient(client2Instance.getConnectString(), timing.session(), timing.connection(), exponentialBackoffRetry) )
+ {
+ client2.start();
+
+ AtomicInteger refreshCount = new AtomicInteger(0);
+ try ( PathChildrenCache cache = new PathChildrenCache(client1, "/test", true) {
+ @Override
+ void refresh(RefreshMode mode) throws Exception
+ {
+ refreshCount.incrementAndGet();
+ super.refresh(mode);
+ }
+ } )
+ {
+ cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+
+ client2.create().forPath("/test/1", "one".getBytes());
+ client2.create().forPath("/test/2", "two".getBytes());
+ client2.create().forPath("/test/3", "three".getBytes());
+
+ Future<?> task = Executors.newSingleThreadExecutor().submit(() -> {
+ try
+ {
+ for ( int i = 0; i < 5; ++i )
+ {
+ cluster.killServer(client1Instance);
+ cluster.restartServer(client1Instance);
+ timing.sleepABit();
+ }
+ }
+ catch ( Exception e )
+ {
+ e.printStackTrace();
+ }
+ });
+
+ client2.create().forPath("/test/4", "four".getBytes());
+ client2.create().forPath("/test/5", "five".getBytes());
+ client2.delete().forPath("/test/4");
+ client2.setData().forPath("/test/1", "1".getBytes());
+ client2.create().forPath("/test/6", "six".getBytes());
+
+ task.get();
+ timing.sleepABit();
+
+ Assert.assertNotNull(cache.getCurrentData("/test/1"));
+ Assert.assertEquals(cache.getCurrentData("/test/1").getData(), "1".getBytes());
+ Assert.assertNotNull(cache.getCurrentData("/test/2"));
+ Assert.assertEquals(cache.getCurrentData("/test/2").getData(), "two".getBytes());
+ Assert.assertNotNull(cache.getCurrentData("/test/3"));
+ Assert.assertEquals(cache.getCurrentData("/test/3").getData(), "three".getBytes());
+ Assert.assertNull(cache.getCurrentData("/test/4"));
+ Assert.assertNotNull(cache.getCurrentData("/test/5"));
+ Assert.assertEquals(cache.getCurrentData("/test/5").getData(), "five".getBytes());
+ Assert.assertNotNull(cache.getCurrentData("/test/6"));
+ Assert.assertEquals(cache.getCurrentData("/test/6").getData(), "six".getBytes());
+
+ Assert.assertEquals(refreshCount.get(), 2);
+ }
+ }
+ }
+ }
+ }
+
@Test(enabled = false) // this test is very flakey - it needs to be re-written at some point
public void testMissedDelete() throws Exception
{
diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
index 9ae6a5d..f932ae4 100644
--- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
+++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
@@ -98,6 +98,11 @@ public class BaseClassForTests
System.setProperty(INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND, "true");
System.setProperty(INTERNAL_PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY, "true");
+ createServer();
+ }
+
+ protected void createServer() throws Exception
+ {
while ( server == null )
{
try
[curator] 01/02: CURATOR-505 ctor should be private
Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
randgalt pushed a commit to branch CURATOR-505
in repository https://gitbox.apache.org/repos/asf/curator.git
commit 7458a20e8320396b33a8389e6bba04d503d23430
Author: randgalt <ra...@apache.org>
AuthorDate: Thu Feb 7 15:46:02 2019 -0500
CURATOR-505 ctor should be private
---
.../curator/framework/listen/StandardListenerManager.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
index 8b60ac1..8d239ca 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
@@ -41,11 +41,6 @@ public class StandardListenerManager<T> implements ListenerManager<T, T>
return new StandardListenerManager<>(container);
}
- public StandardListenerManager(ListenerManager<T, T> container)
- {
- this.container = Objects.requireNonNull(container, "container cannot be null");
- }
-
@Override
public void addListener(T listener)
{
@@ -81,4 +76,9 @@ public class StandardListenerManager<T> implements ListenerManager<T, T>
{
container.forEach(function);
}
+
+ private StandardListenerManager(ListenerManager<T, T> container)
+ {
+ this.container = container;
+ }
}