You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/04/21 20:22:53 UTC
git commit: Major issue! Thread that was canceled due to
CancelLeadershipException was getting re-used and thus exiting immediately.
Instead, a new thread from the pool should be used
Repository: curator
Updated Branches:
refs/heads/CURATOR-104 [created] 62494bd63
Major issue! Thread that was canceled due to CancelLeadershipException was getting re-used and thus exiting immediately. Instead, a new thread from the pool should be used
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/62494bd6
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/62494bd6
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/62494bd6
Branch: refs/heads/CURATOR-104
Commit: 62494bd639bf02ed3f654b98270a3c082902f923
Parents: 99a1b7c
Author: randgalt <ra...@apache.org>
Authored: Mon Apr 21 13:22:40 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Apr 21 13:22:40 2014 -0500
----------------------------------------------------------------------
.../recipes/leader/LeaderSelector.java | 61 +++++------
.../recipes/leader/TestLeaderSelector.java | 101 +++++++++++++++++++
2 files changed, 132 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/62494bd6/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
index ac10733..1a2470c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
@@ -224,11 +224,15 @@ public class LeaderSelector implements Closeable
*
* @return true if re-queue is successful
*/
- public synchronized boolean requeue()
+ public boolean requeue()
{
Preconditions.checkState(state.get() == State.STARTED, "close() has already been called");
+ return internalRequeue();
+ }
- if ( !isQueued )
+ public synchronized boolean internalRequeue()
+ {
+ if ( !isQueued && (state.get() == State.STARTED) )
{
isQueued = true;
Future<Void> task = executorService.submit(new Callable<Void>()
@@ -243,6 +247,10 @@ public class LeaderSelector implements Closeable
finally
{
clearIsQueued();
+ if ( autoRequeue.get() )
+ {
+ internalRequeue();
+ }
}
return null;
}
@@ -393,6 +401,7 @@ public class LeaderSelector implements Closeable
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
+ throw e;
}
catch ( Throwable e )
{
@@ -406,6 +415,7 @@ public class LeaderSelector implements Closeable
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
+ throw e;
}
catch ( Exception e )
{
@@ -428,36 +438,27 @@ public class LeaderSelector implements Closeable
private void doWorkLoop() throws Exception
{
- do
+ KeeperException exception = null;
+ try
{
- KeeperException exception = null;
- try
- {
- doWork();
- }
- catch ( KeeperException.ConnectionLossException e )
- {
- exception = e;
- }
- catch ( KeeperException.SessionExpiredException e )
- {
- exception = e;
- }
- catch ( InterruptedException ignore )
- {
- Future<?> task = ourTask.get();
- if ( (task == null) || !task.isCancelled() ) // if interruptLeadership() was called, not re-set the interrupt state of the thread
- {
- Thread.currentThread().interrupt();
- }
- break;
- }
- if ( (exception != null) && !autoRequeue.get() ) // autoRequeue should ignore connection loss or session expired and just keep trying
- {
- throw exception;
- }
+ doWork();
+ }
+ catch ( KeeperException.ConnectionLossException e )
+ {
+ exception = e;
+ }
+ catch ( KeeperException.SessionExpiredException e )
+ {
+ exception = e;
+ }
+ catch ( InterruptedException ignore )
+ {
+ Thread.currentThread().interrupt();
+ }
+ if ( (exception != null) && !autoRequeue.get() ) // autoRequeue should ignore connection loss or session expired and just keep trying
+ {
+ throw exception;
}
- while ( autoRequeue.get() && (state.get() == State.STARTED) && !Thread.currentThread().isInterrupted() );
}
private synchronized void clearIsQueued()
http://git-wip-us.apache.org/repos/asf/curator/blob/62494bd6/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
index 1ae041b..a4ae2ba 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
@@ -371,6 +371,107 @@ public class TestLeaderSelector extends BaseClassForTests
}
}
+ /**
+ * This is similar to TestLeaderSelector.testKillSessionThenCloseShouldElectNewLeader
+ * The differences are:
+ * it restarts the TestingServer instead of killing the session
+ * it uses autoRequeue instead of explicitly calling requeue
+ */
+ @Test
+ public void testKillServerThenCloseShouldElectNewLeader() throws Exception
+ {
+ final Timing timing = new Timing();
+
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+ try
+ {
+ final Semaphore semaphore = new Semaphore(0);
+ final CountDownLatch interruptedLatch = new CountDownLatch(1);
+ final AtomicInteger leaderCount = new AtomicInteger(0);
+ LeaderSelectorListener listener = new LeaderSelectorListenerAdapter()
+ {
+ @Override
+ public void takeLeadership(CuratorFramework client) throws Exception
+ {
+ leaderCount.incrementAndGet();
+ try
+ {
+ semaphore.release();
+ try
+ {
+ Thread.currentThread().join();
+ }
+ catch ( InterruptedException e )
+ {
+ Thread.currentThread().interrupt();
+ interruptedLatch.countDown();
+ }
+ }
+ finally
+ {
+ leaderCount.decrementAndGet();
+ }
+ }
+ };
+ LeaderSelector leaderSelector1 = new LeaderSelector(client, PATH_NAME, listener);
+ LeaderSelector leaderSelector2 = new LeaderSelector(client, PATH_NAME, listener);
+
+ boolean leaderSelector1Closed = false;
+ boolean leaderSelector2Closed = false;
+
+ leaderSelector1.autoRequeue();
+ leaderSelector2.autoRequeue();
+
+ leaderSelector1.start();
+ leaderSelector2.start();
+
+ Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
+
+ int port = server.getPort();
+ server.stop();
+ timing.sleepABit();
+ server = new TestingServer(port);
+ Assert.assertTrue(timing.awaitLatch(interruptedLatch));
+ timing.sleepABit();
+
+ Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
+ Assert.assertEquals(leaderCount.get(), 1);
+
+ if ( leaderSelector1.hasLeadership() )
+ {
+ leaderSelector1.close();
+ leaderSelector1Closed = true;
+ }
+ else if ( leaderSelector2.hasLeadership() )
+ {
+ leaderSelector2.close();
+ leaderSelector2Closed = true;
+ }
+ else
+ {
+ fail("No leaderselector has leadership!");
+ }
+
+ // Verify that the other leader took over leadership.
+ Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
+ Assert.assertEquals(leaderCount.get(), 1);
+
+ if ( !leaderSelector1Closed )
+ {
+ leaderSelector1.close();
+ }
+ if ( !leaderSelector2Closed )
+ {
+ leaderSelector2.close();
+ }
+ }
+ finally
+ {
+ client.close();
+ }
+ }
+
@Test
public void testClosing() throws Exception
{