You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2019/07/12 13:40:03 UTC
[flink] 03/03: [FLINK-12804] Change mailbox implementation from
bounded to unbounded
This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6cf98b671d889f1003172bc656ded69d1aab9235
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Wed Jul 3 13:32:46 2019 +0200
[FLINK-12804] Change mailbox implementation from bounded to unbounded
This closes #8692.
---
.../flink/streaming/runtime/tasks/StreamTask.java | 2 +-
.../streaming/runtime/tasks/mailbox/Mailbox.java | 37 +---
.../runtime/tasks/mailbox/MailboxImpl.java | 201 +++++----------------
.../runtime/tasks/mailbox/MailboxSender.java | 13 +-
.../tasks/mailbox/execution/MailboxExecutor.java | 17 +-
.../execution/MailboxExecutorServiceImpl.java | 20 --
.../tasks/mailbox/execution/MailboxProcessor.java | 56 ++----
.../execution/SuspendedMailboxDefaultAction.java | 2 +-
.../runtime/tasks/mailbox/MailboxImplTest.java | 88 ++-------
.../execution/MailboxExecutorServiceImplTest.java | 6 +-
.../mailbox/execution/MailboxProcessorTest.java | 13 +-
.../mailbox/execution/TestMailboxExecutor.java | 6 -
12 files changed, 92 insertions(+), 369 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a626bba..52e2011 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -484,7 +484,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
cancelTask();
}
finally {
- mailboxProcessor.cancelMailboxExecution();
+ mailboxProcessor.allActionsCompleted();
cancelables.close();
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
index 9b2d95f..deb69e0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
@@ -23,7 +23,7 @@ import javax.annotation.Nonnull;
import java.util.List;
/**
- * A mailbox is basically a blocking queue for inter-thread message exchange in form of {@link Runnable} objects between
+ * A mailbox is basically a queue for inter-thread message exchange in form of {@link Runnable} objects between
* multiple producer threads and a single consumer. This has a lifecycle of closed -> open -> (quiesced) -> closed.
*/
public interface Mailbox extends MailboxReceiver, MailboxSender {
@@ -56,36 +56,12 @@ public interface Mailbox extends MailboxReceiver, MailboxSender {
List<Runnable> close();
/**
- * The effect of this is that all pending letters in the mailbox are dropped and the given priorityLetter
- * is enqueued to the head of the mailbox. Dropped letters are returned. This method should only be invoked
- * by code that has ownership of the mailbox object and only rarely used, e.g. to submit special events like
- * shutting down the mailbox loop.
- *
- * @param priorityLetter action to enqueue atomically after the mailbox was cleared.
- * @throws MailboxStateException if the mailbox is quiesced or closed.
- */
- @Nonnull
- List<Runnable> clearAndPut(@Nonnull Runnable priorityLetter) throws MailboxStateException;
-
- /**
- * Adds the given action to the head of the mailbox. This method will block if the mailbox is full and
- * should therefore only be called from outside the mailbox main-thread to avoid deadlocks.
- *
- * @param priorityLetter action to enqueue to the head of the mailbox.
- * @throws InterruptedException on interruption.
- * @throws MailboxStateException if the mailbox is quiesced or closed.
- */
- void putFirst(@Nonnull Runnable priorityLetter) throws InterruptedException, MailboxStateException;
-
- /**
- * Adds the given action to the head of the mailbox if the mailbox is not full. Returns true if the letter
- * was successfully added to the mailbox.
+ * Adds the given action to the head of the mailbox.
*
* @param priorityLetter action to enqueue to the head of the mailbox.
- * @return true if the letter was successfully added.
* @throws MailboxStateException if the mailbox is quiesced or closed.
*/
- boolean tryPutFirst(@Nonnull Runnable priorityLetter) throws MailboxStateException;
+ void putFirst(@Nonnull Runnable priorityLetter) throws MailboxStateException;
/**
* Returns the current state of the mailbox as defined by the lifecycle enum {@link State}.
@@ -94,11 +70,4 @@ public interface Mailbox extends MailboxReceiver, MailboxSender {
*/
@Nonnull
State getState();
-
- /**
- * Returns the total capacity of the mailbox.
- *
- * @return the total capacity of the mailbox.
- */
- int capacity();
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
index 0f6bbf0..4f58c4a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
@@ -21,10 +21,13 @@ package org.apache.flink.streaming.runtime.tasks.mailbox;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.locks.Condition;
@@ -38,42 +41,24 @@ import java.util.concurrent.locks.ReentrantLock;
public class MailboxImpl implements Mailbox {
/**
- * The enqueued letters.
- */
- @GuardedBy("lock")
- private final Runnable[] ringBuffer;
-
- /**
* Lock for all concurrent ops.
*/
private final ReentrantLock lock;
/**
- * Condition that is triggered when the buffer is no longer empty.
- */
- @GuardedBy("lock")
- private final Condition notEmpty;
-
- /**
- * Condition that is triggered when the buffer is no longer full.
+ * Internal queue of letters.
*/
@GuardedBy("lock")
- private final Condition notFull;
+ private final LinkedList<Runnable> queue;
/**
- * Index of the ring buffer head.
+ * Condition that is triggered when the mailbox is no longer empty.
*/
@GuardedBy("lock")
- private int headIndex;
-
- /**
- * Index of the ring buffer tail.
- */
- @GuardedBy("lock")
- private int tailIndex;
+ private final Condition notEmpty;
/**
- * Number of letters in the mailbox.
+ * Number of letters in the mailbox. We track it separately from the queue#size to avoid locking on {@link #hasMail()}.
*/
@GuardedBy("lock")
private volatile int count;
@@ -84,24 +69,12 @@ public class MailboxImpl implements Mailbox {
@GuardedBy("lock")
private volatile State state;
- /**
- * A mask to wrap around the indexes of the ring buffer. We use this to avoid ifs or modulo ops.
- */
- private final int moduloMask;
-
public MailboxImpl() {
- this(6); // 2^6 = 64
- }
-
- public MailboxImpl(int capacityPow2) {
- final int capacity = 1 << capacityPow2;
- Preconditions.checkState(capacity > 0);
- this.moduloMask = capacity - 1;
- this.ringBuffer = new Runnable[capacity];
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
- this.notFull = lock.newCondition();
this.state = State.CLOSED;
+ this.queue = new LinkedList<>();
+ this.count = 0;
}
@Override
@@ -114,12 +87,7 @@ public class MailboxImpl implements Mailbox {
final ReentrantLock lock = this.lock;
lock.lock();
try {
- if (isEmpty()) {
- checkTakeStateConditions();
- return Optional.empty();
- } else {
- return Optional.of(takeInternal());
- }
+ return Optional.ofNullable(takeHeadInternal());
} finally {
lock.unlock();
}
@@ -131,11 +99,11 @@ public class MailboxImpl implements Mailbox {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
- while (isEmpty()) {
- checkTakeStateConditions();
+ Runnable headLetter;
+ while ((headLetter = takeHeadInternal()) == null) {
notEmpty.await();
}
- return takeInternal();
+ return headLetter;
} finally {
lock.unlock();
}
@@ -144,31 +112,10 @@ public class MailboxImpl implements Mailbox {
//------------------------------------------------------------------------------------------------------------------
@Override
- public boolean tryPutMail(@Nonnull Runnable letter) throws MailboxStateException {
+ public void putMail(@Nonnull Runnable letter) throws MailboxStateException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
- if (isFull()) {
- checkPutStateConditions();
- return false;
- } else {
- putTailInternal(letter);
- return true;
- }
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public void putMail(@Nonnull Runnable letter) throws InterruptedException, MailboxStateException {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- while (isFull()) {
- checkPutStateConditions();
- notFull.await();
- }
putTailInternal(letter);
} finally {
lock.unlock();
@@ -177,51 +124,12 @@ public class MailboxImpl implements Mailbox {
//------------------------------------------------------------------------------------------------------------------
- @Nonnull
- @Override
- public List<Runnable> clearAndPut(@Nonnull Runnable priorityLetter) throws MailboxStateException {
- ArrayList<Runnable> droppedLetters = new ArrayList<>(capacity());
-
- lock.lock();
- try {
- // check state first to avoid loosing any letters forever through exception
- checkPutStateConditions();
- dropAllLetters(droppedLetters);
- putTailInternal(priorityLetter);
- } finally {
- lock.unlock();
- }
-
- return droppedLetters;
- }
-
- @Override
- public void putFirst(@Nonnull Runnable priorityLetter) throws InterruptedException, MailboxStateException {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- while (isFull()) {
- checkPutStateConditions();
- notFull.await();
- }
- putHeadInternal(priorityLetter);
- } finally {
- lock.unlock();
- }
- }
-
@Override
- public boolean tryPutFirst(@Nonnull Runnable priorityLetter) throws MailboxStateException {
+ public void putFirst(@Nonnull Runnable priorityLetter) throws MailboxStateException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
- if (isFull()) {
- checkPutStateConditions();
- return false;
- } else {
- putHeadInternal(priorityLetter);
- return true;
- }
+ putHeadInternal(priorityLetter);
} finally {
lock.unlock();
}
@@ -229,59 +137,42 @@ public class MailboxImpl implements Mailbox {
//------------------------------------------------------------------------------------------------------------------
- private void putHeadInternal(Runnable letter) throws MailboxStateException {
+ private void putHeadInternal(Runnable newHead) throws MailboxStateException {
assert lock.isHeldByCurrentThread();
checkPutStateConditions();
- headIndex = decreaseIndexWithWrapAround(headIndex);
- this.ringBuffer[headIndex] = letter;
- ++count;
+ queue.addFirst(newHead);
+ incrementCountAndCheckOverflow();
notEmpty.signal();
}
- private void putTailInternal(Runnable letter) throws MailboxStateException {
+ private void putTailInternal(Runnable newTail) throws MailboxStateException {
assert lock.isHeldByCurrentThread();
checkPutStateConditions();
- this.ringBuffer[tailIndex] = letter;
- tailIndex = increaseIndexWithWrapAround(tailIndex);
- ++count;
+ queue.addLast(newTail);
+ incrementCountAndCheckOverflow();
notEmpty.signal();
}
- private Runnable takeInternal() throws MailboxStateException {
- assert lock.isHeldByCurrentThread();
- checkTakeStateConditions();
- final Runnable[] buffer = this.ringBuffer;
- Runnable letter = buffer[headIndex];
- buffer[headIndex] = null;
- headIndex = increaseIndexWithWrapAround(headIndex);
- --count;
- notFull.signal();
- return letter;
+ private void incrementCountAndCheckOverflow() {
+ Preconditions.checkState(++count > 0, "Mailbox overflow.");
}
- private void dropAllLetters(List<Runnable> dropInto) {
+ @Nullable
+ private Runnable takeHeadInternal() throws MailboxStateException {
assert lock.isHeldByCurrentThread();
- int localCount = count;
- while (localCount > 0) {
- dropInto.add(ringBuffer[headIndex]);
- ringBuffer[headIndex] = null;
- headIndex = increaseIndexWithWrapAround(headIndex);
- --localCount;
- notFull.signal();
+ checkTakeStateConditions();
+ Runnable oldHead = queue.pollFirst();
+ if (oldHead != null) {
+ --count;
}
- count = 0;
- }
-
- private int increaseIndexWithWrapAround(int old) {
- return (old + 1) & moduloMask;
- }
-
- private int decreaseIndexWithWrapAround(int old) {
- return (old - 1) & moduloMask;
+ return oldHead;
}
- private boolean isFull() {
- return count >= capacity();
+ private void drainAllLetters(List<Runnable> drainInto) {
+ assert lock.isHeldByCurrentThread();
+ drainInto.addAll(queue);
+ queue.clear();
+ count = 0;
}
private boolean isEmpty() {
@@ -331,7 +222,6 @@ public class MailboxImpl implements Mailbox {
if (state == State.OPEN) {
state = State.QUIESCED;
}
- notFull.signalAll();
} finally {
lock.unlock();
}
@@ -340,20 +230,20 @@ public class MailboxImpl implements Mailbox {
@Nonnull
@Override
public List<Runnable> close() {
- final ArrayList<Runnable> droppedLetters = new ArrayList<>(capacity());
-
lock.lock();
try {
- dropAllLetters(droppedLetters);
+ if (state == State.CLOSED) {
+ return Collections.emptyList();
+ }
+ ArrayList<Runnable> droppedLetters = new ArrayList<>(count);
+ drainAllLetters(droppedLetters);
state = State.CLOSED;
// to unblock all
- notFull.signalAll();
notEmpty.signalAll();
+ return droppedLetters;
} finally {
lock.unlock();
}
-
- return droppedLetters;
}
@Nonnull
@@ -361,9 +251,4 @@ public class MailboxImpl implements Mailbox {
public State getState() {
return state;
}
-
- @Override
- public int capacity() {
- return ringBuffer.length;
- }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
index 2a2274a..0560726 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
@@ -27,21 +27,10 @@ import javax.annotation.Nonnull;
public interface MailboxSender {
/**
- * Enqueues the given letter to the mailbox, if capacity is available. On success, this returns <code>true</code>
- * and <code>false</code> if the mailbox was already full.
- *
- * @param letter the letter to enqueue.
- * @return <code>true</code> iff successful.
- * @throws MailboxStateException if the mailbox is quiesced or closed.
- */
- boolean tryPutMail(@Nonnull Runnable letter) throws MailboxStateException;
-
- /**
* Enqueues the given letter to the mailbox and blocks until there is capacity for a successful put.
*
* @param letter the letter to enqueue.
- * @throws InterruptedException on interruption.
* @throws MailboxStateException if the mailbox is quiesced or closed.
*/
- void putMail(@Nonnull Runnable letter) throws InterruptedException, MailboxStateException;
+ void putMail(@Nonnull Runnable letter) throws MailboxStateException;
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutor.java
index 1848e77..eeca9c2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutor.java
@@ -31,10 +31,7 @@ import java.util.concurrent.RejectedExecutionException;
public interface MailboxExecutor extends Executor {
/**
- * Executes the given command at some time in the future in the mailbox thread. This call can block when the
- * mailbox is currently full. Therefore, this method must not be called from the mailbox thread itself as this
- * can cause a deadlock. Instead, if the caller is already in the mailbox thread, the command should just be
- * executed directly or use the non-blocking {@link #tryExecute(Runnable)}.
+ * Executes the given command at some time in the future in the mailbox thread.
*
* @param command the runnable task to add to the mailbox for execution.
* @throws RejectedExecutionException if this task cannot be accepted for execution, e.g. because the mailbox is
@@ -44,18 +41,6 @@ public interface MailboxExecutor extends Executor {
void execute(@Nonnull Runnable command) throws RejectedExecutionException;
/**
- * Attempts to enqueue the given command in the mailbox for execution. On success, the method returns true. If
- * the mailbox is full, this method returns immediately without adding the command and returns false.
- *
- * @param command the runnable task to add to the mailbox for execution.
- * @return true if the command was added to the mailbox. False if the command could not be added because the mailbox
- * was full.
- * @throws RejectedExecutionException if this task cannot be accepted for execution, e.g. because the mailbox is
- * quiesced or closed.
- */
- boolean tryExecute(Runnable command) throws RejectedExecutionException;
-
- /**
* This methods starts running the command at the head of the mailbox and is intended to be used by the mailbox
* thread to yield from a currently ongoing action to another command. The method blocks until another command to
* run is available in the mailbox and must only be called from the mailbox thread. Must only be called from the
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImpl.java
index 944b7a9..5c3cd95 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImpl.java
@@ -53,27 +53,14 @@ public class MailboxExecutorServiceImpl extends AbstractExecutorService implemen
@Override
public void execute(@Nonnull Runnable command) {
- checkIsNotMailboxThread();
try {
mailbox.putMail(command);
- } catch (InterruptedException irex) {
- Thread.currentThread().interrupt();
- throw new RejectedExecutionException("Sender thread was interrupted while blocking on mailbox.", irex);
} catch (MailboxStateException mbex) {
throw new RejectedExecutionException(mbex);
}
}
@Override
- public boolean tryExecute(Runnable command) {
- try {
- return mailbox.tryPutMail(command);
- } catch (MailboxStateException e) {
- throw new RejectedExecutionException(e);
- }
- }
-
- @Override
public void yield() throws InterruptedException, IllegalStateException {
checkIsMailboxThread();
try {
@@ -147,11 +134,4 @@ public class MailboxExecutorServiceImpl extends AbstractExecutorService implemen
"Illegal thread detected. This method must be called from inside the mailbox thread!");
}
}
-
- private void checkIsNotMailboxThread() {
- if (isMailboxThread()) {
- throw new IllegalStateException(
- "Illegal thread detected. This method must NOT be called from inside the mailbox thread!");
- }
- }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java
index 77906f7..6fc8da0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java
@@ -114,7 +114,11 @@ public class MailboxProcessor {
* {@link java.util.concurrent.RunnableFuture} that are still contained in the mailbox.
*/
public void close() {
- FutureUtils.cancelRunnableFutures(mailboxExecutor.shutdownNow());
+ List<Runnable> droppedLetters = mailboxExecutor.shutdownNow();
+ if (!droppedLetters.isEmpty()) {
+ LOG.debug("Closing the mailbox dropped letters {}.", droppedLetters);
+ FutureUtils.cancelRunnableFutures(droppedLetters);
+ }
}
/**
@@ -138,19 +142,11 @@ public class MailboxProcessor {
}
/**
- * Cancels the mailbox loop execution. All pending mailbox actions will not be executed anymore, if they are
- * instance of {@link java.util.concurrent.RunnableFuture}, they will be cancelled.
- */
- public void cancelMailboxExecution() {
- clearMailboxAndRunPriorityAction(mailboxPoisonLetter);
- }
-
- /**
* Reports a throwable for rethrowing from the mailbox thread. This will clear and cancel all other pending letters.
* @param throwable to report by rethrowing from the mailbox loop.
*/
public void reportThrowable(Throwable throwable) {
- clearMailboxAndRunPriorityAction(() -> {
+ sendPriorityLetter(() -> {
throw new WrappingRuntimeException(throwable);
});
}
@@ -159,17 +155,14 @@ public class MailboxProcessor {
* This method must be called to end the stream task when all actions for the tasks have been performed.
*/
public void allActionsCompleted() {
+ sendPriorityLetter(mailboxPoisonLetter);
+ }
+
+ private void sendPriorityLetter(Runnable priorityLetter) {
try {
- if (mailboxExecutor.isMailboxThread()) {
- mailboxLoopRunning = false;
- ensureControlFlowSignalCheck();
- } else {
- mailbox.putFirst(mailboxPoisonLetter);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ mailbox.putFirst(priorityLetter);
} catch (MailboxStateException me) {
- LOG.debug("Action context could not submit poison letter to mailbox.", me);
+ LOG.debug("Action context could not submit priority letter to mailbox.", me);
}
}
@@ -236,20 +229,7 @@ public class MailboxProcessor {
private void ensureControlFlowSignalCheck() {
// Make sure that mailbox#hasMail is true via a dummy letter so that the flag change is noticed.
if (!mailbox.hasMail()) {
- try {
- mailbox.tryPutMail(() -> {});
- } catch (MailboxStateException me) {
- LOG.debug("Mailbox closed when trying to submit letter for control flow signal.", me);
- }
- }
- }
-
- private void clearMailboxAndRunPriorityAction(Runnable priorityLetter) {
- try {
- List<Runnable> droppedRunnables = mailbox.clearAndPut(priorityLetter);
- FutureUtils.cancelRunnableFutures(droppedRunnables);
- } catch (MailboxStateException msex) {
- LOG.debug("Mailbox already closed in cancel().", msex);
+ sendPriorityLetter(() -> {});
}
}
@@ -283,10 +263,14 @@ public class MailboxProcessor {
@Override
public void resume() {
- Preconditions.checkState(
- mailboxExecutor.isMailboxThread(),
- "SuspendedMailboxDefaultAction::resume resume must only be called from the mailbox-thread!");
+ if (mailboxExecutor.isMailboxThread()) {
+ resumeInternal();
+ } else {
+ sendPriorityLetter(this::resumeInternal);
+ }
+ }
+ private void resumeInternal() {
if (suspendedDefaultAction == this) {
suspendedDefaultAction = null;
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/SuspendedMailboxDefaultAction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/SuspendedMailboxDefaultAction.java
index ca31254..6b3f32c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/SuspendedMailboxDefaultAction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/SuspendedMailboxDefaultAction.java
@@ -24,7 +24,7 @@ package org.apache.flink.streaming.runtime.tasks.mailbox.execution;
public interface SuspendedMailboxDefaultAction {
/**
- * Resume execution of the default action. Must only be called from the mailbox thread!.
+ * Resume execution of the default action.
*/
void resume();
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
index 1a48136..b7b478e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.runtime.tasks.mailbox;
-import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.RunnableWithException;
@@ -30,7 +29,6 @@ import org.junit.Before;
import org.junit.Test;
import java.util.LinkedList;
-import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
@@ -42,9 +40,6 @@ import java.util.function.Consumer;
public class MailboxImplTest {
private static final Runnable POISON_LETTER = () -> {};
- private static final int CAPACITY_POW_2 = 2;
- private static final int CAPACITY = 1 << CAPACITY_POW_2;
-
/**
* Object under test.
*/
@@ -52,7 +47,7 @@ public class MailboxImplTest {
@Before
public void setUp() {
- mailbox = new MailboxImpl(CAPACITY_POW_2);
+ mailbox = new MailboxImpl();
mailbox.open();
}
@@ -61,26 +56,6 @@ public class MailboxImplTest {
mailbox.close();
}
- /**
- * Test for #clearAndPut should remove other pending events and enqueue directly to the head of the mailbox queue.
- */
- @Test
- public void testClearAndPut() throws Exception {
-
- Runnable letterInstance = () -> {};
-
- for (int i = 0; i < CAPACITY; ++i) {
- Assert.assertTrue(mailbox.tryPutMail(letterInstance));
- }
-
- List<Runnable> droppedLetters = mailbox.clearAndPut(POISON_LETTER);
-
- Assert.assertTrue(mailbox.hasMail());
- Assert.assertEquals(POISON_LETTER, mailbox.tryTakeMail().get());
- Assert.assertFalse(mailbox.hasMail());
- Assert.assertEquals(CAPACITY, droppedLetters.size());
- }
-
@Test
public void testPutAsHead() throws Exception {
@@ -88,38 +63,16 @@ public class MailboxImplTest {
Runnable instanceB = () -> {};
Runnable instanceC = () -> {};
Runnable instanceD = () -> {};
- Runnable instanceE = () -> {};
+ mailbox.putMail(instanceC);
+ mailbox.putFirst(instanceB);
mailbox.putMail(instanceD);
- mailbox.tryPutFirst(instanceC);
- mailbox.putMail(instanceE);
mailbox.putFirst(instanceA);
- OneShotLatch latch = new OneShotLatch();
- Thread blockingPut = new Thread(() -> {
- // ensure we are full
- try {
- if (!mailbox.tryPutFirst(() -> { })) {
- latch.trigger();
-
- mailbox.putFirst(instanceB);
-
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (MailboxStateException ignore) {
- }
- });
-
- blockingPut.start();
- latch.await();
-
Assert.assertSame(instanceA, mailbox.takeMail());
- blockingPut.join();
Assert.assertSame(instanceB, mailbox.takeMail());
Assert.assertSame(instanceC, mailbox.takeMail());
Assert.assertSame(instanceD, mailbox.takeMail());
- Assert.assertSame(instanceE, mailbox.takeMail());
Assert.assertFalse(mailbox.tryTakeMail().isPresent());
}
@@ -129,15 +82,13 @@ public class MailboxImplTest {
final Queue<Runnable> testObjects = new LinkedList<>();
Assert.assertFalse(mailbox.hasMail());
- for (int i = 0; i < CAPACITY; ++i) {
+ for (int i = 0; i < 10; ++i) {
Runnable letter = () -> {};
testObjects.add(letter);
- Assert.assertTrue(mailbox.tryPutMail(letter));
+ mailbox.putMail(letter);
Assert.assertTrue(mailbox.hasMail());
}
- Assert.assertFalse(mailbox.tryPutMail(() -> {}));
-
while (!testObjects.isEmpty()) {
Assert.assertEquals(testObjects.remove(), mailbox.tryTakeMail().get());
Assert.assertEquals(!testObjects.isEmpty(), mailbox.hasMail());
@@ -164,9 +115,7 @@ public class MailboxImplTest {
}
return optionalLetter.get();
}),
- ((mailbox, runnable) -> {
- while (!mailbox.tryPutMail(runnable)) {}
- }));
+ MailboxSender::putMail);
}
/**
@@ -176,7 +125,7 @@ public class MailboxImplTest {
public void testCloseUnblocks() throws InterruptedException {
testAllPuttingUnblocksInternal(Mailbox::close);
setUp();
- testUnblocksInternal(() -> mailbox.takeMail(), Mailbox::close, MailboxStateException.class);
+ testUnblocksInternal(() -> mailbox.takeMail(), Mailbox::close);
}
/**
@@ -218,16 +167,6 @@ public class MailboxImplTest {
private void testLifecyclePuttingInternal() throws Exception {
try {
- mailbox.tryPutMail(() -> {});
- Assert.fail();
- } catch (MailboxStateException ignore) {
- }
- try {
- mailbox.tryPutFirst(() -> {});
- Assert.fail();
- } catch (MailboxStateException ignore) {
- }
- try {
mailbox.putMail(() -> {});
Assert.fail();
} catch (MailboxStateException ignore) {
@@ -240,18 +179,15 @@ public class MailboxImplTest {
}
private void testAllPuttingUnblocksInternal(Consumer<Mailbox> unblockMethod) throws InterruptedException {
- testUnblocksInternal(() -> mailbox.putMail(() -> {}), unblockMethod, MailboxStateException.class);
- setUp();
- testUnblocksInternal(() -> mailbox.putFirst(() -> {}), unblockMethod, MailboxStateException.class);
+ testUnblocksInternal(() -> mailbox.putMail(() -> {}), unblockMethod);
setUp();
- testUnblocksInternal(() -> mailbox.clearAndPut(() -> {}), unblockMethod, MailboxStateException.class);
+ testUnblocksInternal(() -> mailbox.putFirst(() -> {}), unblockMethod);
}
private void testUnblocksInternal(
RunnableWithException testMethod,
- Consumer<Mailbox> unblockMethod,
- Class<?> expectedExceptionClass) throws InterruptedException {
- final Thread[] blockedThreads = new Thread[CAPACITY * 2];
+ Consumer<Mailbox> unblockMethod) throws InterruptedException {
+ final Thread[] blockedThreads = new Thread[8];
final Exception[] exceptions = new Exception[blockedThreads.length];
CountDownLatch countDownLatch = new CountDownLatch(blockedThreads.length);
@@ -280,7 +216,7 @@ public class MailboxImplTest {
}
for (Exception exception : exceptions) {
- Assert.assertEquals(expectedExceptionClass, exception.getClass());
+ Assert.assertEquals(MailboxStateException.class, exception.getClass());
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImplTest.java
index 92e106d..7c25e5f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImplTest.java
@@ -75,13 +75,13 @@ public class MailboxExecutorServiceImplTest {
Assert.assertFalse(mailboxExecutorService.isShutdown());
Assert.assertFalse(mailboxExecutorService.isTerminated());
final TestRunnable testRunnable = new TestRunnable();
- Assert.assertTrue(mailboxExecutorService.tryExecute(testRunnable));
+ mailboxExecutorService.execute(testRunnable);
Assert.assertEquals(testRunnable, mailbox.tryTakeMail().get());
CompletableFuture.runAsync(() -> mailboxExecutorService.execute(testRunnable), otherThreadExecutor).get();
Assert.assertEquals(testRunnable, mailbox.takeMail());
final TestRunnable yieldRun = new TestRunnable();
final TestRunnable leftoverRun = new TestRunnable();
- Assert.assertTrue(mailboxExecutorService.tryExecute(yieldRun));
+ mailboxExecutorService.execute(yieldRun);
Future<?> leftoverFuture = CompletableFuture.supplyAsync(
() -> mailboxExecutorService.submit(leftoverRun), otherThreadExecutor).get();
mailboxExecutorService.shutdown();
@@ -96,7 +96,7 @@ public class MailboxExecutorServiceImplTest {
}
try {
- CompletableFuture.runAsync(() -> mailboxExecutorService.tryExecute(testRunnable), otherThreadExecutor).get();
+ CompletableFuture.runAsync(() -> mailboxExecutorService.execute(testRunnable), otherThreadExecutor).get();
Assert.fail("execution should not work after shutdown().");
} catch (ExecutionException expected) {
Assert.assertTrue(expected.getCause() instanceof RejectedExecutionException);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessorTest.java
index 80c71dd..46d5231 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessorTest.java
@@ -38,7 +38,7 @@ public class MailboxProcessorTest {
public void testRejectIfNotOpen() {
MailboxProcessor mailboxProcessor = new MailboxProcessor((ctx) -> {});
try {
- mailboxProcessor.getMailboxExecutor().tryExecute(() -> {});
+ mailboxProcessor.getMailboxExecutor().execute(() -> {});
Assert.fail("Should not be able to accept runnables if not opened.");
} catch (RejectedExecutionException expected) {
}
@@ -49,11 +49,11 @@ public class MailboxProcessorTest {
MailboxProcessor mailboxProcessor = new MailboxProcessor((ctx) -> {});
FutureTask<Void> testRunnableFuture = new FutureTask<>(() -> {}, null);
mailboxProcessor.open();
- mailboxProcessor.getMailboxExecutor().tryExecute(testRunnableFuture);
+ mailboxProcessor.getMailboxExecutor().execute(testRunnableFuture);
mailboxProcessor.prepareClose();
try {
- mailboxProcessor.getMailboxExecutor().tryExecute(() -> {});
+ mailboxProcessor.getMailboxExecutor().execute(() -> {});
Assert.fail("Should not be able to accept runnables if not opened.");
} catch (RejectedExecutionException expected) {
}
@@ -123,11 +123,12 @@ public class MailboxProcessorTest {
}
};
- start(mailboxThread);
+ MailboxProcessor mailboxProcessor = start(mailboxThread);
actionSuspendedLatch.await();
Assert.assertEquals(blockAfterInvocations, counter.get());
- suspendedActionRef.get().resume();
+ SuspendedMailboxDefaultAction suspendedMailboxDefaultAction = suspendedActionRef.get();
+ mailboxProcessor.getMailboxExecutor().execute(suspendedMailboxDefaultAction::resume);
stop(mailboxThread);
Assert.assertEquals(totalInvocations, counter.get());
}
@@ -170,7 +171,7 @@ public class MailboxProcessorTest {
final SuspendedMailboxDefaultAction resume =
suspendedActionRef.getAndSet(null);
if (resume != null) {
- resume.resume();
+ mailboxProcessor.getMailboxExecutor().execute(resume::resume);
} else {
try {
mailboxProcessor.getMailboxExecutor().execute(() -> { });
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/TestMailboxExecutor.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/TestMailboxExecutor.java
index 31126a8..979fe58 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/TestMailboxExecutor.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/TestMailboxExecutor.java
@@ -46,12 +46,6 @@ public class TestMailboxExecutor implements MailboxExecutor {
}
@Override
- public boolean tryExecute(Runnable command) {
- execute(command);
- return true;
- }
-
- @Override
public void yield() throws InterruptedException {
synchronized (lock) {
lock.wait(1);