You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/08/25 00:39:50 UTC
curator git commit: Updated LeaderLatch for error policy
Repository: curator
Updated Branches:
refs/heads/CURATOR-248 94dff8a5a -> 5429a217b
Updated LeaderLatch for error policy
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5429a217
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5429a217
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5429a217
Branch: refs/heads/CURATOR-248
Commit: 5429a217bb23901aaf2b187bb8c1d760d0a76bcc
Parents: 94dff8a
Author: randgalt <ra...@apache.org>
Authored: Mon Aug 24 17:39:41 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Aug 24 17:39:41 2015 -0500
----------------------------------------------------------------------
.../framework/recipes/leader/LeaderLatch.java | 39 +++--
.../recipes/leader/TestLeaderLatch.java | 162 +++++++++++++++++++
2 files changed, 187 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/5429a217/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index da9b8b2..aa4dd9f 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -160,20 +160,20 @@ public class LeaderLatch implements Closeable
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
{
- @Override
- public void run()
- {
- try
- {
- internalStart();
- }
- finally
- {
- startTask.set(null);
- }
- }
- }));
+ internalStart();
+ }
+ finally
+ {
+ startTask.set(null);
+ }
+ }
+ }));
}
/**
@@ -604,7 +604,10 @@ public class LeaderLatch implements Closeable
{
try
{
- reset();
+ if ( client.getErrorPolicy().isErrorState(ConnectionState.SUSPENDED) || !hasLeadership.get() )
+ {
+ reset();
+ }
}
catch ( Exception e )
{
@@ -615,6 +618,14 @@ public class LeaderLatch implements Closeable
}
case SUSPENDED:
+ {
+ if ( client.getErrorPolicy().isErrorState(ConnectionState.SUSPENDED) )
+ {
+ setLeadership(false);
+ }
+ break;
+ }
+
case LOST:
{
setLeadership(false);
http://git-wip-us.apache.org/repos/asf/curator/blob/5429a217/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 3742fb7..bd73e9d 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -21,12 +21,15 @@ package org.apache.curator.framework.recipes.leader;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.framework.state.SessionErrorPolicy;
+import org.apache.curator.framework.state.StandardErrorPolicy;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
@@ -37,11 +40,13 @@ import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -53,6 +58,163 @@ public class TestLeaderLatch extends BaseClassForTests
private static final int MAX_LOOPS = 5;
@Test
+ public void testSessionErrorPolicy() throws Exception
+ {
+ Timing timing = new Timing();
+ LeaderLatch latch = null;
+ CuratorFramework client = null;
+ for ( int i = 0; i < 2; ++i )
+ {
+ boolean isSessionIteration = (i == 0);
+ try
+ {
+ client = CuratorFrameworkFactory.builder()
+ .connectString(server.getConnectString())
+ .connectionTimeoutMs(10000)
+ .sessionTimeoutMs(60000)
+ .retryPolicy(new RetryOneTime(1))
+ .errorPolicy(isSessionIteration ? new SessionErrorPolicy() : new StandardErrorPolicy())
+ .build();
+ final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+ ConnectionStateListener stateListener = new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ states.add(newState.name());
+ }
+ };
+ client.getConnectionStateListenable().addListener(stateListener);
+ client.start();
+
+ latch = new LeaderLatch(client, "/test");
+ LeaderLatchListener listener = new LeaderLatchListener()
+ {
+ @Override
+ public void isLeader()
+ {
+ states.add("true");
+ }
+
+ @Override
+ public void notLeader()
+ {
+ states.add("false");
+ }
+ };
+ latch.addListener(listener);
+ latch.start();
+ Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+ Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
+ server.stop();
+ if ( isSessionIteration )
+ {
+ Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name());
+ server.restart();
+ Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED.name());
+ Assert.assertNull(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS));
+ }
+ else
+ {
+ String s = states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS);
+ Assert.assertTrue("false".equals(s) || ConnectionState.SUSPENDED.name().equals(s));
+ s = states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS);
+ Assert.assertTrue("false".equals(s) || ConnectionState.SUSPENDED.name().equals(s));
+ server.restart();
+ Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED.name());
+ Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
+ }
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(latch);
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+ }
+
+ @Test
+ public void testErrorPolicies() throws Exception
+ {
+ Timing timing = new Timing();
+ LeaderLatch latch = null;
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(server.getConnectString())
+ .connectionTimeoutMs(1000)
+ .sessionTimeoutMs(timing.session())
+ .retryPolicy(new RetryOneTime(1))
+ .errorPolicy(new StandardErrorPolicy())
+ .build();
+ try
+ {
+ final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+ ConnectionStateListener stateListener = new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ states.add(newState.name());
+ }
+ };
+ client.getConnectionStateListenable().addListener(stateListener);
+ client.start();
+ latch = new LeaderLatch(client, "/test");
+ LeaderLatchListener listener = new LeaderLatchListener()
+ {
+ @Override
+ public void isLeader()
+ {
+ states.add("true");
+ }
+
+ @Override
+ public void notLeader()
+ {
+ states.add("false");
+ }
+ };
+ latch.addListener(listener);
+ latch.start();
+ Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+ Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
+ server.close();
+ Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name());
+ Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "false");
+ Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST.name());
+ latch.close();
+ client.close();
+
+ timing.sleepABit();
+ states.clear();
+
+ server = new TestingServer();
+ client = CuratorFrameworkFactory.builder()
+ .connectString(server.getConnectString())
+ .connectionTimeoutMs(1000)
+ .sessionTimeoutMs(timing.session())
+ .retryPolicy(new RetryOneTime(1))
+ .errorPolicy(new SessionErrorPolicy())
+ .build();
+ client.getConnectionStateListenable().addListener(stateListener);
+ client.start();
+ latch = new LeaderLatch(client, "/test");
+ latch.addListener(listener);
+ latch.start();
+ Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+ Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
+ server.close();
+ Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name());
+ Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST.name());
+ Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "false");
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(latch);
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
public void testProperCloseWithoutConnectionEstablished() throws Exception
{
server.stop();