You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/04/21 20:22:53 UTC

git commit: Major issue! Thread that was canceled due to CancelLeadershipException was getting re-used and thus exiting immediately. Instead, a new thread from the pool should be used

Repository: curator
Updated Branches:
  refs/heads/CURATOR-104 [created] 62494bd63


Major issue! Thread that was canceled due to CancelLeadershipException was getting re-used and thus exiting immediately. Instead, a new thread from the pool should be used


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/62494bd6
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/62494bd6
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/62494bd6

Branch: refs/heads/CURATOR-104
Commit: 62494bd639bf02ed3f654b98270a3c082902f923
Parents: 99a1b7c
Author: randgalt <ra...@apache.org>
Authored: Mon Apr 21 13:22:40 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Apr 21 13:22:40 2014 -0500

----------------------------------------------------------------------
 .../recipes/leader/LeaderSelector.java          |  61 +++++------
 .../recipes/leader/TestLeaderSelector.java      | 101 +++++++++++++++++++
 2 files changed, 132 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/62494bd6/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
index ac10733..1a2470c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
@@ -224,11 +224,15 @@ public class LeaderSelector implements Closeable
      *
      * @return true if re-queue is successful
      */
-    public synchronized boolean requeue()
+    public boolean requeue()
     {
         Preconditions.checkState(state.get() == State.STARTED, "close() has already been called");
+        return internalRequeue();
+    }
 
-        if ( !isQueued )
+    public synchronized boolean internalRequeue()
+    {
+        if ( !isQueued && (state.get() == State.STARTED) )
         {
             isQueued = true;
             Future<Void> task = executorService.submit(new Callable<Void>()
@@ -243,6 +247,10 @@ public class LeaderSelector implements Closeable
                     finally
                     {
                         clearIsQueued();
+                        if ( autoRequeue.get() )
+                        {
+                            internalRequeue();
+                        }
                     }
                     return null;
                 }
@@ -393,6 +401,7 @@ public class LeaderSelector implements Closeable
             catch ( InterruptedException e )
             {
                 Thread.currentThread().interrupt();
+                throw e;
             }
             catch ( Throwable e )
             {
@@ -406,6 +415,7 @@ public class LeaderSelector implements Closeable
         catch ( InterruptedException e )
         {
             Thread.currentThread().interrupt();
+            throw e;
         }
         catch ( Exception e )
         {
@@ -428,36 +438,27 @@ public class LeaderSelector implements Closeable
 
     private void doWorkLoop() throws Exception
     {
-        do
+        KeeperException exception = null;
+        try
         {
-            KeeperException exception = null;
-            try
-            {
-                doWork();
-            }
-            catch ( KeeperException.ConnectionLossException e )
-            {
-                exception = e;
-            }
-            catch ( KeeperException.SessionExpiredException e )
-            {
-                exception = e;
-            }
-            catch ( InterruptedException ignore )
-            {
-                Future<?> task = ourTask.get();
-                if ( (task == null) || !task.isCancelled() )    // if interruptLeadership() was called, not re-set the interrupt state of the thread
-                {
-                    Thread.currentThread().interrupt();
-                }
-                break;
-            }
-            if ( (exception != null) && !autoRequeue.get() )   // autoRequeue should ignore connection loss or session expired and just keep trying
-            {
-                throw exception;
-            }
+            doWork();
+        }
+        catch ( KeeperException.ConnectionLossException e )
+        {
+            exception = e;
+        }
+        catch ( KeeperException.SessionExpiredException e )
+        {
+            exception = e;
+        }
+        catch ( InterruptedException ignore )
+        {
+            Thread.currentThread().interrupt();
+        }
+        if ( (exception != null) && !autoRequeue.get() )   // autoRequeue should ignore connection loss or session expired and just keep trying
+        {
+            throw exception;
         }
-        while ( autoRequeue.get() && (state.get() == State.STARTED) && !Thread.currentThread().isInterrupted() );
     }
 
     private synchronized void clearIsQueued()

http://git-wip-us.apache.org/repos/asf/curator/blob/62494bd6/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
index 1ae041b..a4ae2ba 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
@@ -371,6 +371,107 @@ public class TestLeaderSelector extends BaseClassForTests
         }
     }
 
+    /**
+     * This is similar to TestLeaderSelector.testKillSessionThenCloseShouldElectNewLeader
+     * The differences are:
+     * it restarts the TestingServer instead of killing the session
+     * it uses autoRequeue instead of explicitly calling requeue
+     */
+    @Test
+    public void testKillServerThenCloseShouldElectNewLeader() throws Exception
+    {
+        final Timing timing = new Timing();
+
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            final Semaphore semaphore = new Semaphore(0);
+            final CountDownLatch interruptedLatch = new CountDownLatch(1);
+            final AtomicInteger leaderCount = new AtomicInteger(0);
+            LeaderSelectorListener listener = new LeaderSelectorListenerAdapter()
+            {
+                @Override
+                public void takeLeadership(CuratorFramework client) throws Exception
+                {
+                    leaderCount.incrementAndGet();
+                    try
+                    {
+                        semaphore.release();
+                        try
+                        {
+                            Thread.currentThread().join();
+                        }
+                        catch ( InterruptedException e )
+                        {
+                            Thread.currentThread().interrupt();
+                            interruptedLatch.countDown();
+                        }
+                    }
+                    finally
+                    {
+                        leaderCount.decrementAndGet();
+                    }
+                }
+            };
+            LeaderSelector leaderSelector1 = new LeaderSelector(client, PATH_NAME, listener);
+            LeaderSelector leaderSelector2 = new LeaderSelector(client, PATH_NAME, listener);
+
+            boolean leaderSelector1Closed = false;
+            boolean leaderSelector2Closed = false;
+
+            leaderSelector1.autoRequeue();
+            leaderSelector2.autoRequeue();
+
+            leaderSelector1.start();
+            leaderSelector2.start();
+
+            Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
+
+            int port = server.getPort();
+            server.stop();
+            timing.sleepABit();
+            server = new TestingServer(port);
+            Assert.assertTrue(timing.awaitLatch(interruptedLatch));
+            timing.sleepABit();
+
+            Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
+            Assert.assertEquals(leaderCount.get(), 1);
+
+            if ( leaderSelector1.hasLeadership() )
+            {
+                leaderSelector1.close();
+                leaderSelector1Closed = true;
+            }
+            else if ( leaderSelector2.hasLeadership() )
+            {
+                leaderSelector2.close();
+                leaderSelector2Closed = true;
+            }
+            else
+            {
+                fail("No leaderselector has leadership!");
+            }
+
+            // Verify that the other leader took over leadership.
+            Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
+            Assert.assertEquals(leaderCount.get(), 1);
+
+            if ( !leaderSelector1Closed )
+            {
+                leaderSelector1.close();
+            }
+            if ( !leaderSelector2Closed )
+            {
+                leaderSelector2.close();
+            }
+        }
+        finally
+        {
+            client.close();
+        }
+    }
+
     @Test
     public void testClosing() throws Exception
     {