You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/08/24 19:32:08 UTC
[1/3] curator git commit: Initial error policy with two
implementations. Also, applied it to LeaderSelector as a test
Repository: curator
Updated Branches:
refs/heads/CURATOR-248 [created] 94dff8a5a
Initial error policy with two implementations. Also, applied it to LeaderSelector as a test
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/45df7ba7
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/45df7ba7
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/45df7ba7
Branch: refs/heads/CURATOR-248
Commit: 45df7ba71f14a5f9751061a7dff956312bfdd421
Parents: f9af0ce
Author: randgalt <ra...@apache.org>
Authored: Mon Aug 24 12:24:06 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Aug 24 12:24:06 2015 -0500
----------------------------------------------------------------------
.../curator/framework/CuratorFramework.java | 8 ++
.../framework/CuratorFrameworkFactory.java | 20 +++++
.../framework/imps/CuratorFrameworkImpl.java | 10 +++
.../curator/framework/state/ErrorPolicy.java | 18 ++++
.../framework/state/SessionErrorPolicy.java | 13 +++
.../framework/state/StandardErrorPolicy.java | 14 +++
.../leader/LeaderSelectorListenerAdapter.java | 2 +-
.../recipes/leader/TestLeaderSelector.java | 90 ++++++++++++++++++++
8 files changed, 174 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/45df7ba7/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 58c5bf5..d755d28 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -28,6 +28,7 @@ import org.apache.curator.framework.api.transaction.TransactionOp;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.framework.state.ErrorPolicy;
import org.apache.curator.utils.EnsurePath;
import org.apache.zookeeper.Watcher;
@@ -297,4 +298,11 @@ public interface CuratorFramework extends Closeable
* @return facade
*/
public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework();
+
+ /**
+ * Return the configured error policy
+ *
+ * @return error policy
+ */
+ public ErrorPolicy getErrorPolicy();
}
http://git-wip-us.apache.org/repos/asf/curator/blob/45df7ba7/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index dcb2ee6..aa5181d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -31,6 +31,8 @@ import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.framework.imps.CuratorTempFrameworkImpl;
import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.curator.framework.imps.GzipCompressionProvider;
+import org.apache.curator.framework.state.ErrorPolicy;
+import org.apache.curator.framework.state.StandardErrorPolicy;
import org.apache.curator.utils.DefaultZookeeperFactory;
import org.apache.curator.utils.ZookeeperFactory;
import org.apache.zookeeper.CreateMode;
@@ -116,6 +118,7 @@ public class CuratorFrameworkFactory
private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER;
private boolean canBeReadOnly = false;
private boolean useContainerParentsIfAvailable = true;
+ private ErrorPolicy errorPolicy = new StandardErrorPolicy();
/**
* Apply the current values and build a new CuratorFramework
@@ -343,6 +346,18 @@ public class CuratorFrameworkFactory
return this;
}
+ /**
+ * Set the error policy to use. The default is {@link StandardErrorPolicy}
+ *
+ * @param errorPolicy new error policy
+ * @return this
+ */
+ public Builder errorPolicy(ErrorPolicy errorPolicy)
+ {
+ this.errorPolicy = errorPolicy;
+ return this;
+ }
+
public ACLProvider getAclProvider()
{
return aclProvider;
@@ -398,6 +413,11 @@ public class CuratorFrameworkFactory
return useContainerParentsIfAvailable;
}
+ public ErrorPolicy getErrorPolicy()
+ {
+ return errorPolicy;
+ }
+
@Deprecated
public String getAuthScheme()
{
http://git-wip-us.apache.org/repos/asf/curator/blob/45df7ba7/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 41bb7cd..3310daf 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -40,6 +40,7 @@ import org.apache.curator.framework.listen.ListenerContainer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.framework.state.ConnectionStateManager;
+import org.apache.curator.framework.state.ErrorPolicy;
import org.apache.curator.utils.DebugUtils;
import org.apache.curator.utils.EnsurePath;
import org.apache.curator.utils.ThreadUtils;
@@ -83,6 +84,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
private final NamespaceFacadeCache namespaceFacadeCache;
private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this);
private final boolean useContainerParentsIfAvailable;
+ private final ErrorPolicy errorPolicy;
private volatile ExecutorService executorService;
private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
@@ -124,6 +126,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
aclProvider = builder.getAclProvider();
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable();
+ errorPolicy = Preconditions.checkNotNull(builder.getErrorPolicy(), "errorPolicy cannot be null");
byte[] builderDefaultData = builder.getDefaultData();
defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
@@ -197,6 +200,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
state = parent.state;
authInfos = parent.authInfos;
useContainerParentsIfAvailable = parent.useContainerParentsIfAvailable;
+ errorPolicy = parent.errorPolicy;
}
@Override
@@ -241,6 +245,12 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
@Override
+ public ErrorPolicy getErrorPolicy()
+ {
+ return errorPolicy;
+ }
+
+ @Override
public void start()
{
log.info("Starting");
http://git-wip-us.apache.org/repos/asf/curator/blob/45df7ba7/curator-framework/src/main/java/org/apache/curator/framework/state/ErrorPolicy.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ErrorPolicy.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ErrorPolicy.java
new file mode 100644
index 0000000..0e1bfb5
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ErrorPolicy.java
@@ -0,0 +1,18 @@
+package org.apache.curator.framework.state;
+
+/**
+ * Recipes should use the configured error policy to decide how to handle
+ * errors such as {@link ConnectionState} changes.
+ */
+public interface ErrorPolicy
+{
+ /**
+ * Returns true if the given state should cause the recipe to
+ * act as though the connection has been lost. i.e. locks should
+ * exit, etc.
+ *
+ * @param state the state
+ * @return true/false
+ */
+ boolean isErrorState(ConnectionState state);
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/45df7ba7/curator-framework/src/main/java/org/apache/curator/framework/state/SessionErrorPolicy.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/SessionErrorPolicy.java b/curator-framework/src/main/java/org/apache/curator/framework/state/SessionErrorPolicy.java
new file mode 100644
index 0000000..3f68fe4
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/SessionErrorPolicy.java
@@ -0,0 +1,13 @@
+package org.apache.curator.framework.state;
+
+/**
+ * This policy treats only {@link ConnectionState#LOST} as an error
+ */
+public class SessionErrorPolicy implements ErrorPolicy
+{
+ @Override
+ public boolean isErrorState(ConnectionState state)
+ {
+ return state == ConnectionState.LOST;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/45df7ba7/curator-framework/src/main/java/org/apache/curator/framework/state/StandardErrorPolicy.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/StandardErrorPolicy.java b/curator-framework/src/main/java/org/apache/curator/framework/state/StandardErrorPolicy.java
new file mode 100644
index 0000000..ea0c668
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/StandardErrorPolicy.java
@@ -0,0 +1,14 @@
+package org.apache.curator.framework.state;
+
+/**
+ * This policy treats {@link ConnectionState#SUSPENDED} and {@link ConnectionState#LOST}
+ * as errors
+ */
+public class StandardErrorPolicy implements ErrorPolicy
+{
+ @Override
+ public boolean isErrorState(ConnectionState state)
+ {
+ return ((state == ConnectionState.SUSPENDED) || (state == ConnectionState.LOST));
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/45df7ba7/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelectorListenerAdapter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelectorListenerAdapter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelectorListenerAdapter.java
index 7402fa7..1b0070a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelectorListenerAdapter.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelectorListenerAdapter.java
@@ -30,7 +30,7 @@ public abstract class LeaderSelectorListenerAdapter implements LeaderSelectorLis
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
- if ( (newState == ConnectionState.SUSPENDED) || (newState == ConnectionState.LOST) )
+ if ( client.getErrorPolicy().isErrorState(newState) )
{
throw new CancelLeadershipException();
}
http://git-wip-us.apache.org/repos/asf/curator/blob/45df7ba7/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
index c7f415c..ae19b3c 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
@@ -20,10 +20,13 @@
package org.apache.curator.framework.recipes.leader;
import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.framework.state.SessionErrorPolicy;
+import org.apache.curator.framework.state.StandardErrorPolicy;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.KillSession;
@@ -50,6 +53,93 @@ public class TestLeaderSelector extends BaseClassForTests
private static final String PATH_NAME = "/one/two/me";
@Test
+ public void testErrorPolicies() throws Exception
+ {
+ Timing timing = new Timing();
+ LeaderSelector selector = null;
+ CuratorFramework client = CuratorFrameworkFactory
+ .builder()
+ .connectString(server.getConnectString())
+ .connectionTimeoutMs(timing.connection())
+ .sessionTimeoutMs(timing.session())
+ .retryPolicy(new RetryOneTime(1))
+ .errorPolicy(new StandardErrorPolicy())
+ .build();
+ try
+ {
+ final BlockingQueue<String> changes = Queues.newLinkedBlockingQueue();
+
+ ConnectionStateListener stateListener = new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ changes.add(newState.name());
+ }
+ };
+ client.getConnectionStateListenable().addListener(stateListener);
+ client.start();
+ LeaderSelectorListener listener = new LeaderSelectorListenerAdapter()
+ {
+ @Override
+ public void takeLeadership(CuratorFramework client) throws Exception
+ {
+ changes.add("leader");
+ try
+ {
+ Thread.currentThread().join();
+ }
+ catch ( InterruptedException e )
+ {
+ changes.add("release");
+ Thread.currentThread().interrupt();
+ }
+ }
+ };
+ selector = new LeaderSelector(client, "/test", listener);
+ selector.start();
+
+ Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+ Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "leader");
+ server.close();
+ Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name());
+ Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "release");
+ Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST.name());
+
+ selector.close();
+ client.close();
+ timing.sleepABit();
+ changes.clear();
+
+ server = new TestingServer();
+ client = CuratorFrameworkFactory
+ .builder()
+ .connectString(server.getConnectString())
+ .connectionTimeoutMs(timing.connection())
+ .sessionTimeoutMs(timing.session())
+ .retryPolicy(new RetryOneTime(1))
+ .errorPolicy(new SessionErrorPolicy())
+ .build();
+ client.getConnectionStateListenable().addListener(stateListener);
+ client.start();
+ selector = new LeaderSelector(client, "/test", listener);
+ selector.start();
+
+ Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+ Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "leader");
+ server.stop();
+ Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name());
+ Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST.name());
+ Assert.assertEquals(changes.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "release");
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(selector);
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
public void testLeaderNodeDeleteOnInterrupt() throws Exception
{
Timing timing = new Timing();
[2/3] curator git commit: doc
Posted by ra...@apache.org.
doc
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2e1e92e1
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2e1e92e1
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2e1e92e1
Branch: refs/heads/CURATOR-248
Commit: 2e1e92e162ef812a9d076695500f0bda8b15b6c5
Parents: 45df7ba
Author: randgalt <ra...@apache.org>
Authored: Mon Aug 24 12:30:33 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Aug 24 12:30:33 2015 -0500
----------------------------------------------------------------------
src/site/confluence/errors.confluence | 7 +++++++
1 file changed, 7 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/2e1e92e1/src/site/confluence/errors.confluence
----------------------------------------------------------------------
diff --git a/src/site/confluence/errors.confluence b/src/site/confluence/errors.confluence
index 28805e0..c9545ac 100644
--- a/src/site/confluence/errors.confluence
+++ b/src/site/confluence/errors.confluence
@@ -31,5 +31,12 @@ appropriate action. These are the possible state changes:
{{UnhandledErrorListener}} is called when a background task, etc. catches an exception. In general, Curator users shouldn't care
about these as they are logged. However, you can listen for them if you choose.
+h2. Error Policy
+
+Curator has a pluggable error policy. The default policy takes the conservative approach of treating connection states SUSPENDED and LOST the same way.
+i.e. when a recipe sees the state change to SUSPENDED it will assume that the ZooKeeper session is lost and will clean up any watchers, nodes, etc. You can choose,
+however, a more aggressive approach by setting the error policy to only treat LOST (i.e. true session loss) as an error state. Do this in the CuratorFrameworkFactory via:
+{{errorPolicy(new SessionErrorPolicy())}}.
+
h2. Recipes
In general, the recipes attempt to deal with errors and connection issues. See the doc for each recipe for details on how it deals with errors.
[3/3] curator git commit: Added since tags
Posted by ra...@apache.org.
Added since tags
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/94dff8a5
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/94dff8a5
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/94dff8a5
Branch: refs/heads/CURATOR-248
Commit: 94dff8a5a2ec336a23c05ebe5cdf7e4b117d3925
Parents: 2e1e92e
Author: randgalt <ra...@apache.org>
Authored: Mon Aug 24 12:31:52 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Aug 24 12:31:52 2015 -0500
----------------------------------------------------------------------
.../java/org/apache/curator/framework/CuratorFrameworkFactory.java | 1 +
.../main/java/org/apache/curator/framework/state/ErrorPolicy.java | 2 ++
2 files changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/94dff8a5/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index aa5181d..9a67684 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -349,6 +349,7 @@ public class CuratorFrameworkFactory
/**
* Set the error policy to use. The default is {@link StandardErrorPolicy}
*
+ * @since 3.0.0
* @param errorPolicy new error policy
* @return this
*/
http://git-wip-us.apache.org/repos/asf/curator/blob/94dff8a5/curator-framework/src/main/java/org/apache/curator/framework/state/ErrorPolicy.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ErrorPolicy.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ErrorPolicy.java
index 0e1bfb5..73fc99d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ErrorPolicy.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ErrorPolicy.java
@@ -3,6 +3,8 @@ package org.apache.curator.framework.state;
/**
* Recipes should use the configured error policy to decide how to handle
* errors such as {@link ConnectionState} changes.
+ *
+ * @since 3.0.0
*/
public interface ErrorPolicy
{