You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ti...@apache.org on 2023/03/13 05:41:25 UTC

[curator] branch master updated: CURATOR-518: Fix LeaderSelector requeue broken by interruptLeadership (#446)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git


The following commit(s) were added to refs/heads/master by this push:
     new 34432073 CURATOR-518: Fix LeaderSelector requeue broken by interruptLeadership (#446)
34432073 is described below

commit 34432073079012fe1d45ab5ff6c15607d95b43d8
Author: Kezhu Wang <ke...@gmail.com>
AuthorDate: Mon Mar 13 13:41:17 2023 +0800

    CURATOR-518: Fix LeaderSelector requeue broken by interruptLeadership (#446)
---
 .../framework/recipes/leader/LeaderSelector.java   | 78 ++++++++++++++--------
 .../recipes/leader/LeaderSelectorListener.java     | 12 +++-
 .../recipes/leader/TestLeaderSelector.java         | 52 ++++++++++++++-
 3 files changed, 112 insertions(+), 30 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
index 4a331763..4d00c4ce 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
@@ -72,7 +72,10 @@ public class LeaderSelector implements Closeable
     private final InterProcessMutex mutex;
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
     private final AtomicBoolean autoRequeue = new AtomicBoolean(false);
-    private final AtomicReference<Future<?>> ourTask = new AtomicReference<Future<?>>(null);
+
+    // guarded by synchronization
+    private Future<?> ourTask = null;
+    private Thread ourThread = null;
 
     private volatile boolean hasLeadership;
     private volatile String id = "";
@@ -88,9 +91,6 @@ public class LeaderSelector implements Closeable
         CLOSED
     }
 
-    // guarded by synchronization
-    private boolean isQueued = false;
-
     private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("LeaderSelector");
 
     /**
@@ -224,6 +224,10 @@ public class LeaderSelector implements Closeable
      * happens and false is returned. If the instance was not queued, it is re-queued and true
      * is returned
      *
+     * <p>The attempt will finish after session error, leadership release. This method is inherently
+     * hard to use as there is no public API to guarantee successful requeue. Try {@link #autoRequeue()}
+     * if you are in doubt.</p>
+     *
      * @return true if re-queue is successful
      */
     public boolean requeue()
@@ -234,30 +238,25 @@ public class LeaderSelector implements Closeable
 
     private synchronized boolean internalRequeue()
     {
-        if ( !isQueued && (state.get() == State.STARTED) )
+        if ( ourTask == null && (state.get() == State.STARTED) )
         {
-            isQueued = true;
-            Future<Void> task = executorService.submit(new Callable<Void>()
+            ourTask = executorService.submit(new Callable<Void>()
             {
                 @Override
                 public Void call() throws Exception
                 {
                     try
                     {
+                        taskStarted();
                         doWorkLoop();
                     }
                     finally
                     {
-                        clearIsQueued();
-                        if ( autoRequeue.get() )
-                        {
-                            internalRequeue();
-                        }
+                        taskDone();
                     }
                     return null;
                 }
             });
-            ourTask.set(task);
 
             return true;
         }
@@ -273,7 +272,7 @@ public class LeaderSelector implements Closeable
 
         client.getConnectionStateListenable().removeListener(listener);
         executorService.close();
-        ourTask.set(null);
+        ourTask = null;
     }
 
     /**
@@ -374,15 +373,39 @@ public class LeaderSelector implements Closeable
         return hasLeadership;
     }
 
+    private synchronized void taskStarted() {
+        ourThread = Thread.currentThread();
+    }
+
+    private synchronized void taskDone() {
+        ourTask = null;
+        ourThread = null;
+        if (autoRequeue.get()) {
+            internalRequeue();
+        }
+    }
+
+    /**
+     * Cancel ongoing election regardless of leadership.
+     */
+    private synchronized void cancelElection() {
+        // Correctness with requeue:
+        // * Cancel, taskStarted and taskDone are guarded by synchronized(this).
+        // * If ourThread is null, new task will observe this cancellation after taskStarted.
+        // * If ourThread is not null, old task will be cancelled and new task will observe
+        //   this cancellation.
+        if (ourThread != null) {
+            ourThread.interrupt();
+        }
+    }
+
     /**
      * Attempt to cancel and interrupt the current leadership if this instance has leadership
      */
     public synchronized void interruptLeadership()
     {
-        Future<?> task = ourTask.get();
-        if ( task != null )
-        {
-            task.cancel(true);
+        if (hasLeadership) {
+            cancelElection();
         }
     }
 
@@ -403,6 +426,10 @@ public class LeaderSelector implements Closeable
     @VisibleForTesting
     volatile AtomicInteger failedMutexReleaseCount = null;
 
+    /**
+     * This method must not be called concurrently to obey guarantee to
+     * {@link LeaderSelectorListener#takeLeadership(CuratorFramework)}.
+     */
     @VisibleForTesting
     void doWork() throws Exception
     {
@@ -433,10 +460,6 @@ public class LeaderSelector implements Closeable
             {
                 ThreadUtils.checkInterrupted(e);
             }
-            finally
-            {
-                clearIsQueued();
-            }
         }
         catch ( InterruptedException e )
         {
@@ -500,11 +523,6 @@ public class LeaderSelector implements Closeable
         }
     }
 
-    private synchronized void clearIsQueued()
-    {
-        isQueued = false;
-    }
-
     // temporary wrapper for deprecated constructor
     private static ExecutorService wrapExecutor(final Executor executor)
     {
@@ -585,7 +603,11 @@ public class LeaderSelector implements Closeable
             }
             catch ( CancelLeadershipException dummy )
             {
-                leaderSelector.interruptLeadership();
+                // If we cancel only leadership but not whole election, then we could hand over
+                // dated leadership to client with no further cancellation. Dated leadership is
+                // possible due to separated steps in leadership acquire: server data(e.g. election sequence)
+                // change and client flag(e.g. hasLeadership) set.
+                leaderSelector.cancelElection();
             }
         }
     }
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelectorListener.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelectorListener.java
index b77cc432..fabdded5 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelectorListener.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelectorListener.java
@@ -19,6 +19,7 @@
 package org.apache.curator.framework.recipes.leader;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 
 /**
@@ -30,7 +31,16 @@ public interface LeaderSelectorListener extends ConnectionStateListener
 {
     /**
      * Called when your instance has been granted leadership. This method
-     * should not return until you wish to release leadership
+     * should not return until you wish to release leadership.
+     *
+     * <p>It is guaranteed that there is no concurrent executions of this
+     * method.</p>
+     *
+     * <p>It is guaranteed that this method will be interrupted if
+     * {@link #stateChanged(CuratorFramework, ConnectionState)}
+     * throws {@link CancelLeadershipException}. After interrupted, this
+     * method should exit(either return or throw) promptly, otherwise it
+     * will block following elections.</p>
      *
      * @param client the client
      * @throws Exception any errors
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
index 44e02690..541f4c5c 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
@@ -46,8 +46,11 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.AbstractExecutorService;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -257,6 +260,48 @@ public class TestLeaderSelector extends BaseClassForTests
         }
     }
 
+    private static class DelayedExecutorService extends AbstractExecutorService {
+        private final ExecutorService executor = Executors.newFixedThreadPool(4);
+
+        @Override
+        public void shutdown() {
+            executor.shutdown();
+        }
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            return executor.shutdownNow();
+        }
+
+        @Override
+        public boolean isShutdown() {
+            return executor.isShutdown();
+        }
+
+        @Override
+        public boolean isTerminated() {
+            return executor.isTerminated();
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+            return executor.awaitTermination(timeout, unit);
+        }
+
+        @Override
+        public void execute(Runnable command) {
+            executor.execute(() -> {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException ignored) {
+                    // Mimic cancel before execution.
+                    return;
+                }
+                command.run();
+            });
+        }
+    }
+
     @Test
     public void testInterruptLeadershipWithRequeue() throws Exception
     {
@@ -278,11 +323,16 @@ public class TestLeaderSelector extends BaseClassForTests
                     Thread.currentThread().join();
                 }
             };
-            selector = new LeaderSelector(client, "/leader", listener);
+            selector = new LeaderSelector(client, "/leader", new DelayedExecutorService(), listener);
             selector.autoRequeue();
             selector.start();
 
+            // There is only one participant, so semaphore acquire should succeed in finite
+            // duration no matter how we interrupt.
             assertTrue(timing.acquireSemaphore(semaphore));
+            Thread.sleep(20);
+            selector.interruptLeadership();
+            Thread.sleep(20);
             selector.interruptLeadership();
 
             assertTrue(timing.acquireSemaphore(semaphore));