You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ti...@apache.org on 2023/03/13 05:41:25 UTC
[curator] branch master updated: CURATOR-518: Fix LeaderSelector requeue broken by interruptLeadership (#446)
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git
The following commit(s) were added to refs/heads/master by this push:
new 34432073 CURATOR-518: Fix LeaderSelector requeue broken by interruptLeadership (#446)
34432073 is described below
commit 34432073079012fe1d45ab5ff6c15607d95b43d8
Author: Kezhu Wang <ke...@gmail.com>
AuthorDate: Mon Mar 13 13:41:17 2023 +0800
CURATOR-518: Fix LeaderSelector requeue broken by interruptLeadership (#446)
---
.../framework/recipes/leader/LeaderSelector.java | 78 ++++++++++++++--------
.../recipes/leader/LeaderSelectorListener.java | 12 +++-
.../recipes/leader/TestLeaderSelector.java | 52 ++++++++++++++-
3 files changed, 112 insertions(+), 30 deletions(-)
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
index 4a331763..4d00c4ce 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
@@ -72,7 +72,10 @@ public class LeaderSelector implements Closeable
private final InterProcessMutex mutex;
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
private final AtomicBoolean autoRequeue = new AtomicBoolean(false);
- private final AtomicReference<Future<?>> ourTask = new AtomicReference<Future<?>>(null);
+
+ // guarded by synchronization
+ private Future<?> ourTask = null;
+ private Thread ourThread = null;
private volatile boolean hasLeadership;
private volatile String id = "";
@@ -88,9 +91,6 @@ public class LeaderSelector implements Closeable
CLOSED
}
- // guarded by synchronization
- private boolean isQueued = false;
-
private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("LeaderSelector");
/**
@@ -224,6 +224,10 @@ public class LeaderSelector implements Closeable
* happens and false is returned. If the instance was not queued, it is re-queued and true
* is returned
*
+ * <p>The attempt will finish after session error, leadership release. This method is inherently
+ * hard to use as there is no public API to guarantee successful requeue. Try {@link #autoRequeue()}
+ * if you are in doubt.</p>
+ *
* @return true if re-queue is successful
*/
public boolean requeue()
@@ -234,30 +238,25 @@ public class LeaderSelector implements Closeable
private synchronized boolean internalRequeue()
{
- if ( !isQueued && (state.get() == State.STARTED) )
+ if ( ourTask == null && (state.get() == State.STARTED) )
{
- isQueued = true;
- Future<Void> task = executorService.submit(new Callable<Void>()
+ ourTask = executorService.submit(new Callable<Void>()
{
@Override
public Void call() throws Exception
{
try
{
+ taskStarted();
doWorkLoop();
}
finally
{
- clearIsQueued();
- if ( autoRequeue.get() )
- {
- internalRequeue();
- }
+ taskDone();
}
return null;
}
});
- ourTask.set(task);
return true;
}
@@ -273,7 +272,7 @@ public class LeaderSelector implements Closeable
client.getConnectionStateListenable().removeListener(listener);
executorService.close();
- ourTask.set(null);
+ ourTask = null;
}
/**
@@ -374,15 +373,39 @@ public class LeaderSelector implements Closeable
return hasLeadership;
}
+ private synchronized void taskStarted() {
+ ourThread = Thread.currentThread();
+ }
+
+ private synchronized void taskDone() {
+ ourTask = null;
+ ourThread = null;
+ if (autoRequeue.get()) {
+ internalRequeue();
+ }
+ }
+
+ /**
+ * Cancel ongoing election regardless of leadership.
+ */
+ private synchronized void cancelElection() {
+ // Correctness with requeue:
+ // * Cancel, taskStarted and taskDone are guarded by synchronized(this).
+ // * If ourThread is null, new task will observe this cancellation after taskStarted.
+ // * If ourThread is not null, old task will be cancelled and new task will observe
+ // this cancellation.
+ if (ourThread != null) {
+ ourThread.interrupt();
+ }
+ }
+
/**
* Attempt to cancel and interrupt the current leadership if this instance has leadership
*/
public synchronized void interruptLeadership()
{
- Future<?> task = ourTask.get();
- if ( task != null )
- {
- task.cancel(true);
+ if (hasLeadership) {
+ cancelElection();
}
}
@@ -403,6 +426,10 @@ public class LeaderSelector implements Closeable
@VisibleForTesting
volatile AtomicInteger failedMutexReleaseCount = null;
+ /**
+ * This method must not be called concurrently to obey guarantee to
+ * {@link LeaderSelectorListener#takeLeadership(CuratorFramework)}.
+ */
@VisibleForTesting
void doWork() throws Exception
{
@@ -433,10 +460,6 @@ public class LeaderSelector implements Closeable
{
ThreadUtils.checkInterrupted(e);
}
- finally
- {
- clearIsQueued();
- }
}
catch ( InterruptedException e )
{
@@ -500,11 +523,6 @@ public class LeaderSelector implements Closeable
}
}
- private synchronized void clearIsQueued()
- {
- isQueued = false;
- }
-
// temporary wrapper for deprecated constructor
private static ExecutorService wrapExecutor(final Executor executor)
{
@@ -585,7 +603,11 @@ public class LeaderSelector implements Closeable
}
catch ( CancelLeadershipException dummy )
{
- leaderSelector.interruptLeadership();
+ // If we cancel only leadership but not whole election, then we could hand over
+ // dated leadership to client with no further cancellation. Dated leadership is
+ // possible due to separated steps in leadership acquire: server data(e.g. election sequence)
+ // change and client flag(e.g. hasLeadership) set.
+ leaderSelector.cancelElection();
}
}
}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelectorListener.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelectorListener.java
index b77cc432..fabdded5 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelectorListener.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelectorListener.java
@@ -19,6 +19,7 @@
package org.apache.curator.framework.recipes.leader;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
/**
@@ -30,7 +31,16 @@ public interface LeaderSelectorListener extends ConnectionStateListener
{
/**
* Called when your instance has been granted leadership. This method
- * should not return until you wish to release leadership
+ * should not return until you wish to release leadership.
+ *
+ * <p>It is guaranteed that there is no concurrent executions of this
+ * method.</p>
+ *
+ * <p>It is guaranteed that this method will be interrupted if
+ * {@link #stateChanged(CuratorFramework, ConnectionState)}
+ * throws {@link CancelLeadershipException}. After interrupted, this
+ * method should exit(either return or throw) promptly, otherwise it
+ * will block following elections.</p>
*
* @param client the client
* @throws Exception any errors
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
index 44e02690..541f4c5c 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
@@ -46,8 +46,11 @@ import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -257,6 +260,48 @@ public class TestLeaderSelector extends BaseClassForTests
}
}
+ private static class DelayedExecutorService extends AbstractExecutorService {
+ private final ExecutorService executor = Executors.newFixedThreadPool(4);
+
+ @Override
+ public void shutdown() {
+ executor.shutdown();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ return executor.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return executor.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return executor.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return executor.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ executor.execute(() -> {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ignored) {
+ // Mimic cancel before execution.
+ return;
+ }
+ command.run();
+ });
+ }
+ }
+
@Test
public void testInterruptLeadershipWithRequeue() throws Exception
{
@@ -278,11 +323,16 @@ public class TestLeaderSelector extends BaseClassForTests
Thread.currentThread().join();
}
};
- selector = new LeaderSelector(client, "/leader", listener);
+ selector = new LeaderSelector(client, "/leader", new DelayedExecutorService(), listener);
selector.autoRequeue();
selector.start();
+ // There is only one participant, so semaphore acquire should succeed in finite
+ // duration no matter how we interrupt.
assertTrue(timing.acquireSemaphore(semaphore));
+ Thread.sleep(20);
+ selector.interruptLeadership();
+ Thread.sleep(20);
selector.interruptLeadership();
assertTrue(timing.acquireSemaphore(semaphore));