You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2019/07/12 13:40:03 UTC

[flink] 03/03: [FLINK-12804] Change mailbox implementation from bounded to unbounded

This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6cf98b671d889f1003172bc656ded69d1aab9235
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Wed Jul 3 13:32:46 2019 +0200

    [FLINK-12804] Change mailbox implementation from bounded to unbounded
    
    This closes #8692.
---
 .../flink/streaming/runtime/tasks/StreamTask.java  |   2 +-
 .../streaming/runtime/tasks/mailbox/Mailbox.java   |  37 +---
 .../runtime/tasks/mailbox/MailboxImpl.java         | 201 +++++----------------
 .../runtime/tasks/mailbox/MailboxSender.java       |  13 +-
 .../tasks/mailbox/execution/MailboxExecutor.java   |  17 +-
 .../execution/MailboxExecutorServiceImpl.java      |  20 --
 .../tasks/mailbox/execution/MailboxProcessor.java  |  56 ++----
 .../execution/SuspendedMailboxDefaultAction.java   |   2 +-
 .../runtime/tasks/mailbox/MailboxImplTest.java     |  88 ++-------
 .../execution/MailboxExecutorServiceImplTest.java  |   6 +-
 .../mailbox/execution/MailboxProcessorTest.java    |  13 +-
 .../mailbox/execution/TestMailboxExecutor.java     |   6 -
 12 files changed, 92 insertions(+), 369 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a626bba..52e2011 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -484,7 +484,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			cancelTask();
 		}
 		finally {
-			mailboxProcessor.cancelMailboxExecution();
+			mailboxProcessor.allActionsCompleted();
 			cancelables.close();
 		}
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
index 9b2d95f..deb69e0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
@@ -23,7 +23,7 @@ import javax.annotation.Nonnull;
 import java.util.List;
 
 /**
- * A mailbox is basically a blocking queue for inter-thread message exchange in form of {@link Runnable} objects between
+ * A mailbox is basically a queue for inter-thread message exchange in form of {@link Runnable} objects between
  * multiple producer threads and a single consumer. This has a lifecycle of closed -> open -> (quiesced) -> closed.
  */
 public interface Mailbox extends MailboxReceiver, MailboxSender {
@@ -56,36 +56,12 @@ public interface Mailbox extends MailboxReceiver, MailboxSender {
 	List<Runnable> close();
 
 	/**
-	 * The effect of this is that all pending letters in the mailbox are dropped and the given priorityLetter
-	 * is enqueued to the head of the mailbox. Dropped letters are returned. This method should only be invoked
-	 * by code that has ownership of the mailbox object and only rarely used, e.g. to submit special events like
-	 * shutting down the mailbox loop.
-	 *
-	 * @param priorityLetter action to enqueue atomically after the mailbox was cleared.
-	 * @throws MailboxStateException if the mailbox is quiesced or closed.
-	 */
-	@Nonnull
-	List<Runnable> clearAndPut(@Nonnull Runnable priorityLetter) throws MailboxStateException;
-
-	/**
-	 * Adds the given action to the head of the mailbox. This method will block if the mailbox is full and
-	 * should therefore only be called from outside the mailbox main-thread to avoid deadlocks.
-	 *
-	 * @param priorityLetter action to enqueue to the head of the mailbox.
-	 * @throws InterruptedException on interruption.
-	 * @throws MailboxStateException if the mailbox is quiesced or closed.
-	 */
-	void putFirst(@Nonnull Runnable priorityLetter) throws InterruptedException, MailboxStateException;
-
-	/**
-	 * Adds the given action to the head of the mailbox if the mailbox is not full. Returns true if the letter
-	 * was successfully added to the mailbox.
+	 * Adds the given action to the head of the mailbox.
 	 *
 	 * @param priorityLetter action to enqueue to the head of the mailbox.
-	 * @return true if the letter was successfully added.
 	 * @throws MailboxStateException if the mailbox is quiesced or closed.
 	 */
-	boolean tryPutFirst(@Nonnull Runnable priorityLetter) throws MailboxStateException;
+	void putFirst(@Nonnull Runnable priorityLetter) throws MailboxStateException;
 
 	/**
 	 * Returns the current state of the mailbox as defined by the lifecycle enum {@link State}.
@@ -94,11 +70,4 @@ public interface Mailbox extends MailboxReceiver, MailboxSender {
 	 */
 	@Nonnull
 	State getState();
-
-	/**
-	 * Returns the total capacity of the mailbox.
-	 *
-	 * @return the total capacity of the mailbox.
-	 */
-	int capacity();
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
index 0f6bbf0..4f58c4a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
@@ -21,10 +21,13 @@ package org.apache.flink.streaming.runtime.tasks.mailbox;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.locks.Condition;
@@ -38,42 +41,24 @@ import java.util.concurrent.locks.ReentrantLock;
 public class MailboxImpl implements Mailbox {
 
 	/**
-	 * The enqueued letters.
-	 */
-	@GuardedBy("lock")
-	private final Runnable[] ringBuffer;
-
-	/**
 	 * Lock for all concurrent ops.
 	 */
 	private final ReentrantLock lock;
 
 	/**
-	 * Condition that is triggered when the buffer is no longer empty.
-	 */
-	@GuardedBy("lock")
-	private final Condition notEmpty;
-
-	/**
-	 * Condition that is triggered when the buffer is no longer full.
+	 * Internal queue of letters.
 	 */
 	@GuardedBy("lock")
-	private final Condition notFull;
+	private final LinkedList<Runnable> queue;
 
 	/**
-	 * Index of the ring buffer head.
+	 * Condition that is triggered when the mailbox is no longer empty.
 	 */
 	@GuardedBy("lock")
-	private int headIndex;
-
-	/**
-	 * Index of the ring buffer tail.
-	 */
-	@GuardedBy("lock")
-	private int tailIndex;
+	private final Condition notEmpty;
 
 	/**
-	 * Number of letters in the mailbox.
+	 * Number of letters in the mailbox. We track it separately from the queue#size to avoid locking on {@link #hasMail()}.
 	 */
 	@GuardedBy("lock")
 	private volatile int count;
@@ -84,24 +69,12 @@ public class MailboxImpl implements Mailbox {
 	@GuardedBy("lock")
 	private volatile State state;
 
-	/**
-	 * A mask to wrap around the indexes of the ring buffer. We use this to avoid ifs or modulo ops.
-	 */
-	private final int moduloMask;
-
 	public MailboxImpl() {
-		this(6); // 2^6 = 64
-	}
-
-	public MailboxImpl(int capacityPow2) {
-		final int capacity = 1 << capacityPow2;
-		Preconditions.checkState(capacity > 0);
-		this.moduloMask = capacity - 1;
-		this.ringBuffer = new Runnable[capacity];
 		this.lock = new ReentrantLock();
 		this.notEmpty = lock.newCondition();
-		this.notFull = lock.newCondition();
 		this.state = State.CLOSED;
+		this.queue = new LinkedList<>();
+		this.count = 0;
 	}
 
 	@Override
@@ -114,12 +87,7 @@ public class MailboxImpl implements Mailbox {
 		final ReentrantLock lock = this.lock;
 		lock.lock();
 		try {
-			if (isEmpty()) {
-				checkTakeStateConditions();
-				return Optional.empty();
-			} else {
-				return Optional.of(takeInternal());
-			}
+			return Optional.ofNullable(takeHeadInternal());
 		} finally {
 			lock.unlock();
 		}
@@ -131,11 +99,11 @@ public class MailboxImpl implements Mailbox {
 		final ReentrantLock lock = this.lock;
 		lock.lockInterruptibly();
 		try {
-			while (isEmpty()) {
-				checkTakeStateConditions();
+			Runnable headLetter;
+			while ((headLetter = takeHeadInternal()) == null) {
 				notEmpty.await();
 			}
-			return takeInternal();
+			return headLetter;
 		} finally {
 			lock.unlock();
 		}
@@ -144,31 +112,10 @@ public class MailboxImpl implements Mailbox {
 	//------------------------------------------------------------------------------------------------------------------
 
 	@Override
-	public boolean tryPutMail(@Nonnull Runnable letter) throws MailboxStateException {
+	public void putMail(@Nonnull Runnable letter) throws MailboxStateException {
 		final ReentrantLock lock = this.lock;
 		lock.lock();
 		try {
-			if (isFull()) {
-				checkPutStateConditions();
-				return false;
-			} else {
-				putTailInternal(letter);
-				return true;
-			}
-		} finally {
-			lock.unlock();
-		}
-	}
-
-	@Override
-	public void putMail(@Nonnull Runnable letter) throws InterruptedException, MailboxStateException {
-		final ReentrantLock lock = this.lock;
-		lock.lockInterruptibly();
-		try {
-			while (isFull()) {
-				checkPutStateConditions();
-				notFull.await();
-			}
 			putTailInternal(letter);
 		} finally {
 			lock.unlock();
@@ -177,51 +124,12 @@ public class MailboxImpl implements Mailbox {
 
 	//------------------------------------------------------------------------------------------------------------------
 
-	@Nonnull
-	@Override
-	public List<Runnable> clearAndPut(@Nonnull Runnable priorityLetter) throws MailboxStateException {
-		ArrayList<Runnable> droppedLetters = new ArrayList<>(capacity());
-
-		lock.lock();
-		try {
-			// check state first to avoid loosing any letters forever through exception
-			checkPutStateConditions();
-			dropAllLetters(droppedLetters);
-			putTailInternal(priorityLetter);
-		} finally {
-			lock.unlock();
-		}
-
-		return droppedLetters;
-	}
-
-	@Override
-	public void putFirst(@Nonnull Runnable priorityLetter) throws InterruptedException, MailboxStateException {
-		final ReentrantLock lock = this.lock;
-		lock.lockInterruptibly();
-		try {
-			while (isFull()) {
-				checkPutStateConditions();
-				notFull.await();
-			}
-			putHeadInternal(priorityLetter);
-		} finally {
-			lock.unlock();
-		}
-	}
-
 	@Override
-	public boolean tryPutFirst(@Nonnull Runnable priorityLetter) throws MailboxStateException {
+	public void putFirst(@Nonnull Runnable priorityLetter) throws MailboxStateException {
 		final ReentrantLock lock = this.lock;
 		lock.lock();
 		try {
-			if (isFull()) {
-				checkPutStateConditions();
-				return false;
-			} else {
-				putHeadInternal(priorityLetter);
-				return true;
-			}
+			putHeadInternal(priorityLetter);
 		} finally {
 			lock.unlock();
 		}
@@ -229,59 +137,42 @@ public class MailboxImpl implements Mailbox {
 
 	//------------------------------------------------------------------------------------------------------------------
 
-	private void putHeadInternal(Runnable letter) throws MailboxStateException {
+	private void putHeadInternal(Runnable newHead) throws MailboxStateException {
 		assert lock.isHeldByCurrentThread();
 		checkPutStateConditions();
-		headIndex = decreaseIndexWithWrapAround(headIndex);
-		this.ringBuffer[headIndex] = letter;
-		++count;
+		queue.addFirst(newHead);
+		incrementCountAndCheckOverflow();
 		notEmpty.signal();
 	}
 
-	private void putTailInternal(Runnable letter) throws MailboxStateException {
+	private void putTailInternal(Runnable newTail) throws MailboxStateException {
 		assert lock.isHeldByCurrentThread();
 		checkPutStateConditions();
-		this.ringBuffer[tailIndex] = letter;
-		tailIndex = increaseIndexWithWrapAround(tailIndex);
-		++count;
+		queue.addLast(newTail);
+		incrementCountAndCheckOverflow();
 		notEmpty.signal();
 	}
 
-	private Runnable takeInternal() throws MailboxStateException {
-		assert lock.isHeldByCurrentThread();
-		checkTakeStateConditions();
-		final Runnable[] buffer = this.ringBuffer;
-		Runnable letter = buffer[headIndex];
-		buffer[headIndex] = null;
-		headIndex = increaseIndexWithWrapAround(headIndex);
-		--count;
-		notFull.signal();
-		return letter;
+	private void incrementCountAndCheckOverflow() {
+		Preconditions.checkState(++count > 0, "Mailbox overflow.");
 	}
 
-	private void dropAllLetters(List<Runnable> dropInto) {
+	@Nullable
+	private Runnable takeHeadInternal() throws MailboxStateException {
 		assert lock.isHeldByCurrentThread();
-		int localCount = count;
-		while (localCount > 0) {
-			dropInto.add(ringBuffer[headIndex]);
-			ringBuffer[headIndex] = null;
-			headIndex = increaseIndexWithWrapAround(headIndex);
-			--localCount;
-			notFull.signal();
+		checkTakeStateConditions();
+		Runnable oldHead = queue.pollFirst();
+		if (oldHead != null) {
+			--count;
 		}
-		count = 0;
-	}
-
-	private int increaseIndexWithWrapAround(int old) {
-		return (old + 1) & moduloMask;
-	}
-
-	private int decreaseIndexWithWrapAround(int old) {
-		return (old - 1) & moduloMask;
+		return oldHead;
 	}
 
-	private boolean isFull() {
-		return count >= capacity();
+	private void drainAllLetters(List<Runnable> drainInto) {
+		assert lock.isHeldByCurrentThread();
+		drainInto.addAll(queue);
+		queue.clear();
+		count = 0;
 	}
 
 	private boolean isEmpty() {
@@ -331,7 +222,6 @@ public class MailboxImpl implements Mailbox {
 			if (state == State.OPEN) {
 				state = State.QUIESCED;
 			}
-			notFull.signalAll();
 		} finally {
 			lock.unlock();
 		}
@@ -340,20 +230,20 @@ public class MailboxImpl implements Mailbox {
 	@Nonnull
 	@Override
 	public List<Runnable> close() {
-		final ArrayList<Runnable> droppedLetters = new ArrayList<>(capacity());
-
 		lock.lock();
 		try {
-			dropAllLetters(droppedLetters);
+			if (state == State.CLOSED) {
+				return Collections.emptyList();
+			}
+			ArrayList<Runnable> droppedLetters = new ArrayList<>(count);
+			drainAllLetters(droppedLetters);
 			state = State.CLOSED;
 			// to unblock all
-			notFull.signalAll();
 			notEmpty.signalAll();
+			return droppedLetters;
 		} finally {
 			lock.unlock();
 		}
-
-		return droppedLetters;
 	}
 
 	@Nonnull
@@ -361,9 +251,4 @@ public class MailboxImpl implements Mailbox {
 	public State getState() {
 		return state;
 	}
-
-	@Override
-	public int capacity() {
-		return ringBuffer.length;
-	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
index 2a2274a..0560726 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
@@ -27,21 +27,10 @@ import javax.annotation.Nonnull;
 public interface MailboxSender {
 
 	/**
-	 * Enqueues the given letter to the mailbox, if capacity is available. On success, this returns <code>true</code>
-	 * and <code>false</code> if the mailbox was already full.
-	 *
-	 * @param letter the letter to enqueue.
-	 * @return <code>true</code> iff successful.
-	 * @throws MailboxStateException if the mailbox is quiesced or closed.
-	 */
-	boolean tryPutMail(@Nonnull Runnable letter) throws MailboxStateException;
-
-	/**
 	 * Enqueues the given letter to the mailbox and blocks until there is capacity for a successful put.
 	 *
 	 * @param letter the letter to enqueue.
-	 * @throws InterruptedException on interruption.
 	 * @throws MailboxStateException if the mailbox is quiesced or closed.
 	 */
-	void putMail(@Nonnull Runnable letter) throws InterruptedException,  MailboxStateException;
+	void putMail(@Nonnull Runnable letter) throws  MailboxStateException;
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutor.java
index 1848e77..eeca9c2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutor.java
@@ -31,10 +31,7 @@ import java.util.concurrent.RejectedExecutionException;
 public interface MailboxExecutor extends Executor {
 
 	/**
-	 * Executes the given command at some time in the future in the mailbox thread. This call can block when the
-	 * mailbox is currently full. Therefore, this method must not be called from the mailbox thread itself as this
-	 * can cause a deadlock. Instead, if the caller is already in the mailbox thread, the command should just be
-	 * executed directly or use the non-blocking {@link #tryExecute(Runnable)}.
+	 * Executes the given command at some time in the future in the mailbox thread.
 	 *
 	 * @param command the runnable task to add to the mailbox for execution.
 	 * @throws RejectedExecutionException if this task cannot be accepted for execution, e.g. because the mailbox is
@@ -44,18 +41,6 @@ public interface MailboxExecutor extends Executor {
 	void execute(@Nonnull Runnable command) throws RejectedExecutionException;
 
 	/**
-	 * Attempts to enqueue the given command in the mailbox for execution. On success, the method returns true. If
-	 * the mailbox is full, this method returns immediately without adding the command and returns false.
-	 *
-	 * @param command the runnable task to add to the mailbox for execution.
-	 * @return true if the command was added to the mailbox. False if the command could not be added because the mailbox
-	 * was full.
-	 * @throws RejectedExecutionException if this task cannot be accepted for execution, e.g. because the mailbox is
-	 *                                    quiesced or closed.
-	 */
-	boolean tryExecute(Runnable command) throws RejectedExecutionException;
-
-	/**
 	 * This methods starts running the command at the head of the mailbox and is intended to be used by the mailbox
 	 * thread to yield from a currently ongoing action to another command. The method blocks until another command to
 	 * run is available in the mailbox and must only be called from the mailbox thread. Must only be called from the
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImpl.java
index 944b7a9..5c3cd95 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImpl.java
@@ -53,27 +53,14 @@ public class MailboxExecutorServiceImpl extends AbstractExecutorService implemen
 
 	@Override
 	public void execute(@Nonnull Runnable command) {
-		checkIsNotMailboxThread();
 		try {
 			mailbox.putMail(command);
-		} catch (InterruptedException irex) {
-			Thread.currentThread().interrupt();
-			throw new RejectedExecutionException("Sender thread was interrupted while blocking on mailbox.", irex);
 		} catch (MailboxStateException mbex) {
 			throw new RejectedExecutionException(mbex);
 		}
 	}
 
 	@Override
-	public boolean tryExecute(Runnable command) {
-		try {
-			return mailbox.tryPutMail(command);
-		} catch (MailboxStateException e) {
-			throw new RejectedExecutionException(e);
-		}
-	}
-
-	@Override
 	public void yield() throws InterruptedException, IllegalStateException {
 		checkIsMailboxThread();
 		try {
@@ -147,11 +134,4 @@ public class MailboxExecutorServiceImpl extends AbstractExecutorService implemen
 				"Illegal thread detected. This method must be called from inside the mailbox thread!");
 		}
 	}
-
-	private void checkIsNotMailboxThread() {
-		if (isMailboxThread()) {
-			throw new IllegalStateException(
-				"Illegal thread detected. This method must NOT be called from inside the mailbox thread!");
-		}
-	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java
index 77906f7..6fc8da0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java
@@ -114,7 +114,11 @@ public class MailboxProcessor {
 	 * {@link java.util.concurrent.RunnableFuture} that are still contained in the mailbox.
 	 */
 	public void close() {
-		FutureUtils.cancelRunnableFutures(mailboxExecutor.shutdownNow());
+		List<Runnable> droppedLetters = mailboxExecutor.shutdownNow();
+		if (!droppedLetters.isEmpty()) {
+			LOG.debug("Closing the mailbox dropped letters {}.", droppedLetters);
+			FutureUtils.cancelRunnableFutures(droppedLetters);
+		}
 	}
 
 	/**
@@ -138,19 +142,11 @@ public class MailboxProcessor {
 	}
 
 	/**
-	 * Cancels the mailbox loop execution. All pending mailbox actions will not be executed anymore, if they are
-	 * instance of {@link java.util.concurrent.RunnableFuture}, they will be cancelled.
-	 */
-	public void cancelMailboxExecution() {
-		clearMailboxAndRunPriorityAction(mailboxPoisonLetter);
-	}
-
-	/**
 	 * Reports a throwable for rethrowing from the mailbox thread. This will clear and cancel all other pending letters.
 	 * @param throwable to report by rethrowing from the mailbox loop.
 	 */
 	public void reportThrowable(Throwable throwable) {
-		clearMailboxAndRunPriorityAction(() -> {
+		sendPriorityLetter(() -> {
 			throw new WrappingRuntimeException(throwable);
 		});
 	}
@@ -159,17 +155,14 @@ public class MailboxProcessor {
 	 * This method must be called to end the stream task when all actions for the tasks have been performed.
 	 */
 	public void allActionsCompleted() {
+		sendPriorityLetter(mailboxPoisonLetter);
+	}
+
+	private void sendPriorityLetter(Runnable priorityLetter) {
 		try {
-			if (mailboxExecutor.isMailboxThread()) {
-				mailboxLoopRunning = false;
-				ensureControlFlowSignalCheck();
-			} else {
-				mailbox.putFirst(mailboxPoisonLetter);
-			}
-		} catch (InterruptedException e) {
-			Thread.currentThread().interrupt();
+			mailbox.putFirst(priorityLetter);
 		} catch (MailboxStateException me) {
-			LOG.debug("Action context could not submit poison letter to mailbox.", me);
+			LOG.debug("Action context could not submit priority letter to mailbox.", me);
 		}
 	}
 
@@ -236,20 +229,7 @@ public class MailboxProcessor {
 	private void ensureControlFlowSignalCheck() {
 		// Make sure that mailbox#hasMail is true via a dummy letter so that the flag change is noticed.
 		if (!mailbox.hasMail()) {
-			try {
-				mailbox.tryPutMail(() -> {});
-			} catch (MailboxStateException me) {
-				LOG.debug("Mailbox closed when trying to submit letter for control flow signal.", me);
-			}
-		}
-	}
-
-	private void clearMailboxAndRunPriorityAction(Runnable priorityLetter) {
-		try {
-			List<Runnable> droppedRunnables = mailbox.clearAndPut(priorityLetter);
-			FutureUtils.cancelRunnableFutures(droppedRunnables);
-		} catch (MailboxStateException msex) {
-			LOG.debug("Mailbox already closed in cancel().", msex);
+			sendPriorityLetter(() -> {});
 		}
 	}
 
@@ -283,10 +263,14 @@ public class MailboxProcessor {
 
 		@Override
 		public void resume() {
-			Preconditions.checkState(
-				mailboxExecutor.isMailboxThread(),
-				"SuspendedMailboxDefaultAction::resume resume must only be called from the mailbox-thread!");
+			if (mailboxExecutor.isMailboxThread()) {
+				resumeInternal();
+			} else {
+				sendPriorityLetter(this::resumeInternal);
+			}
+		}
 
+		private void resumeInternal() {
 			if (suspendedDefaultAction == this) {
 				suspendedDefaultAction = null;
 			}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/SuspendedMailboxDefaultAction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/SuspendedMailboxDefaultAction.java
index ca31254..6b3f32c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/SuspendedMailboxDefaultAction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/SuspendedMailboxDefaultAction.java
@@ -24,7 +24,7 @@ package org.apache.flink.streaming.runtime.tasks.mailbox.execution;
 public interface SuspendedMailboxDefaultAction {
 
 	/**
-	 * Resume execution of the default action. Must only be called from the mailbox thread!.
+	 * Resume execution of the default action.
 	 */
 	void resume();
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
index 1a48136..b7b478e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.runtime.tasks.mailbox;
 
-import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.FunctionWithException;
 import org.apache.flink.util.function.RunnableWithException;
@@ -30,7 +29,6 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.CountDownLatch;
@@ -42,9 +40,6 @@ import java.util.function.Consumer;
 public class MailboxImplTest {
 
 	private static final Runnable POISON_LETTER = () -> {};
-	private static final int CAPACITY_POW_2 = 2;
-	private static final int CAPACITY = 1 << CAPACITY_POW_2;
-
 	/**
 	 * Object under test.
 	 */
@@ -52,7 +47,7 @@ public class MailboxImplTest {
 
 	@Before
 	public void setUp() {
-		mailbox = new MailboxImpl(CAPACITY_POW_2);
+		mailbox = new MailboxImpl();
 		mailbox.open();
 	}
 
@@ -61,26 +56,6 @@ public class MailboxImplTest {
 		mailbox.close();
 	}
 
-	/**
-	 * Test for #clearAndPut should remove other pending events and enqueue directly to the head of the mailbox queue.
-	 */
-	@Test
-	public void testClearAndPut() throws Exception {
-
-		Runnable letterInstance = () -> {};
-
-		for (int i = 0; i < CAPACITY; ++i) {
-			Assert.assertTrue(mailbox.tryPutMail(letterInstance));
-		}
-
-		List<Runnable> droppedLetters = mailbox.clearAndPut(POISON_LETTER);
-
-		Assert.assertTrue(mailbox.hasMail());
-		Assert.assertEquals(POISON_LETTER, mailbox.tryTakeMail().get());
-		Assert.assertFalse(mailbox.hasMail());
-		Assert.assertEquals(CAPACITY, droppedLetters.size());
-	}
-
 	@Test
 	public void testPutAsHead() throws Exception {
 
@@ -88,38 +63,16 @@ public class MailboxImplTest {
 		Runnable instanceB = () -> {};
 		Runnable instanceC = () -> {};
 		Runnable instanceD = () -> {};
-		Runnable instanceE = () -> {};
 
+		mailbox.putMail(instanceC);
+		mailbox.putFirst(instanceB);
 		mailbox.putMail(instanceD);
-		mailbox.tryPutFirst(instanceC);
-		mailbox.putMail(instanceE);
 		mailbox.putFirst(instanceA);
 
-		OneShotLatch latch = new OneShotLatch();
-		Thread blockingPut = new Thread(() -> {
-			// ensure we are full
-			try {
-				if (!mailbox.tryPutFirst(() -> { })) {
-					latch.trigger();
-
-					mailbox.putFirst(instanceB);
-
-				}
-			} catch (InterruptedException e) {
-				Thread.currentThread().interrupt();
-			} catch (MailboxStateException ignore) {
-			}
-		});
-
-		blockingPut.start();
-		latch.await();
-
 		Assert.assertSame(instanceA, mailbox.takeMail());
-		blockingPut.join();
 		Assert.assertSame(instanceB, mailbox.takeMail());
 		Assert.assertSame(instanceC, mailbox.takeMail());
 		Assert.assertSame(instanceD, mailbox.takeMail());
-		Assert.assertSame(instanceE, mailbox.takeMail());
 
 		Assert.assertFalse(mailbox.tryTakeMail().isPresent());
 	}
@@ -129,15 +82,13 @@ public class MailboxImplTest {
 		final Queue<Runnable> testObjects = new LinkedList<>();
 		Assert.assertFalse(mailbox.hasMail());
 
-		for (int i = 0; i < CAPACITY; ++i) {
+		for (int i = 0; i < 10; ++i) {
 			Runnable letter = () -> {};
 			testObjects.add(letter);
-			Assert.assertTrue(mailbox.tryPutMail(letter));
+			mailbox.putMail(letter);
 			Assert.assertTrue(mailbox.hasMail());
 		}
 
-		Assert.assertFalse(mailbox.tryPutMail(() -> {}));
-
 		while (!testObjects.isEmpty()) {
 			Assert.assertEquals(testObjects.remove(), mailbox.tryTakeMail().get());
 			Assert.assertEquals(!testObjects.isEmpty(), mailbox.hasMail());
@@ -164,9 +115,7 @@ public class MailboxImplTest {
 				}
 				return optionalLetter.get();
 			}),
-			((mailbox, runnable) -> {
-				while (!mailbox.tryPutMail(runnable)) {}
-			}));
+			MailboxSender::putMail);
 	}
 
 	/**
@@ -176,7 +125,7 @@ public class MailboxImplTest {
 	public void testCloseUnblocks() throws InterruptedException {
 		testAllPuttingUnblocksInternal(Mailbox::close);
 		setUp();
-		testUnblocksInternal(() -> mailbox.takeMail(), Mailbox::close, MailboxStateException.class);
+		testUnblocksInternal(() -> mailbox.takeMail(), Mailbox::close);
 	}
 
 	/**
@@ -218,16 +167,6 @@ public class MailboxImplTest {
 
 	private void testLifecyclePuttingInternal() throws Exception {
 		try {
-			mailbox.tryPutMail(() -> {});
-			Assert.fail();
-		} catch (MailboxStateException ignore) {
-		}
-		try {
-			mailbox.tryPutFirst(() -> {});
-			Assert.fail();
-		} catch (MailboxStateException ignore) {
-		}
-		try {
 			mailbox.putMail(() -> {});
 			Assert.fail();
 		} catch (MailboxStateException ignore) {
@@ -240,18 +179,15 @@ public class MailboxImplTest {
 	}
 
 	private void testAllPuttingUnblocksInternal(Consumer<Mailbox> unblockMethod) throws InterruptedException {
-		testUnblocksInternal(() -> mailbox.putMail(() -> {}), unblockMethod, MailboxStateException.class);
-		setUp();
-		testUnblocksInternal(() -> mailbox.putFirst(() -> {}), unblockMethod, MailboxStateException.class);
+		testUnblocksInternal(() -> mailbox.putMail(() -> {}), unblockMethod);
 		setUp();
-		testUnblocksInternal(() -> mailbox.clearAndPut(() -> {}), unblockMethod, MailboxStateException.class);
+		testUnblocksInternal(() -> mailbox.putFirst(() -> {}), unblockMethod);
 	}
 
 	private void testUnblocksInternal(
 		RunnableWithException testMethod,
-		Consumer<Mailbox> unblockMethod,
-		Class<?> expectedExceptionClass) throws InterruptedException {
-		final Thread[] blockedThreads = new Thread[CAPACITY * 2];
+		Consumer<Mailbox> unblockMethod) throws InterruptedException {
+		final Thread[] blockedThreads = new Thread[8];
 		final Exception[] exceptions = new Exception[blockedThreads.length];
 
 		CountDownLatch countDownLatch = new CountDownLatch(blockedThreads.length);
@@ -280,7 +216,7 @@ public class MailboxImplTest {
 		}
 
 		for (Exception exception : exceptions) {
-			Assert.assertEquals(expectedExceptionClass, exception.getClass());
+			Assert.assertEquals(MailboxStateException.class, exception.getClass());
 		}
 
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImplTest.java
index 92e106d..7c25e5f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImplTest.java
@@ -75,13 +75,13 @@ public class MailboxExecutorServiceImplTest {
 		Assert.assertFalse(mailboxExecutorService.isShutdown());
 		Assert.assertFalse(mailboxExecutorService.isTerminated());
 		final TestRunnable testRunnable = new TestRunnable();
-		Assert.assertTrue(mailboxExecutorService.tryExecute(testRunnable));
+		mailboxExecutorService.execute(testRunnable);
 		Assert.assertEquals(testRunnable, mailbox.tryTakeMail().get());
 		CompletableFuture.runAsync(() -> mailboxExecutorService.execute(testRunnable), otherThreadExecutor).get();
 		Assert.assertEquals(testRunnable, mailbox.takeMail());
 		final TestRunnable yieldRun = new TestRunnable();
 		final TestRunnable leftoverRun = new TestRunnable();
-		Assert.assertTrue(mailboxExecutorService.tryExecute(yieldRun));
+		mailboxExecutorService.execute(yieldRun);
 		Future<?> leftoverFuture = CompletableFuture.supplyAsync(
 			() -> mailboxExecutorService.submit(leftoverRun), otherThreadExecutor).get();
 		mailboxExecutorService.shutdown();
@@ -96,7 +96,7 @@ public class MailboxExecutorServiceImplTest {
 		}
 
 		try {
-			CompletableFuture.runAsync(() -> mailboxExecutorService.tryExecute(testRunnable), otherThreadExecutor).get();
+			CompletableFuture.runAsync(() -> mailboxExecutorService.execute(testRunnable), otherThreadExecutor).get();
 			Assert.fail("execution should not work after shutdown().");
 		} catch (ExecutionException expected) {
 			Assert.assertTrue(expected.getCause() instanceof RejectedExecutionException);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessorTest.java
index 80c71dd..46d5231 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessorTest.java
@@ -38,7 +38,7 @@ public class MailboxProcessorTest {
 	public void testRejectIfNotOpen() {
 		MailboxProcessor mailboxProcessor = new MailboxProcessor((ctx) -> {});
 		try {
-			mailboxProcessor.getMailboxExecutor().tryExecute(() -> {});
+			mailboxProcessor.getMailboxExecutor().execute(() -> {});
 			Assert.fail("Should not be able to accept runnables if not opened.");
 		} catch (RejectedExecutionException expected) {
 		}
@@ -49,11 +49,11 @@ public class MailboxProcessorTest {
 		MailboxProcessor mailboxProcessor = new MailboxProcessor((ctx) -> {});
 		FutureTask<Void> testRunnableFuture = new FutureTask<>(() -> {}, null);
 		mailboxProcessor.open();
-		mailboxProcessor.getMailboxExecutor().tryExecute(testRunnableFuture);
+		mailboxProcessor.getMailboxExecutor().execute(testRunnableFuture);
 		mailboxProcessor.prepareClose();
 
 		try {
-			mailboxProcessor.getMailboxExecutor().tryExecute(() -> {});
+			mailboxProcessor.getMailboxExecutor().execute(() -> {});
 			Assert.fail("Should not be able to accept runnables if not opened.");
 		} catch (RejectedExecutionException expected) {
 		}
@@ -123,11 +123,12 @@ public class MailboxProcessorTest {
 			}
 		};
 
-		start(mailboxThread);
+		MailboxProcessor mailboxProcessor = start(mailboxThread);
 		actionSuspendedLatch.await();
 		Assert.assertEquals(blockAfterInvocations, counter.get());
 
-		suspendedActionRef.get().resume();
+		SuspendedMailboxDefaultAction suspendedMailboxDefaultAction = suspendedActionRef.get();
+		mailboxProcessor.getMailboxExecutor().execute(suspendedMailboxDefaultAction::resume);
 		stop(mailboxThread);
 		Assert.assertEquals(totalInvocations, counter.get());
 	}
@@ -170,7 +171,7 @@ public class MailboxProcessorTest {
 				final SuspendedMailboxDefaultAction resume =
 					suspendedActionRef.getAndSet(null);
 				if (resume != null) {
-					resume.resume();
+					mailboxProcessor.getMailboxExecutor().execute(resume::resume);
 				} else {
 					try {
 						mailboxProcessor.getMailboxExecutor().execute(() -> { });
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/TestMailboxExecutor.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/TestMailboxExecutor.java
index 31126a8..979fe58 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/TestMailboxExecutor.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/TestMailboxExecutor.java
@@ -46,12 +46,6 @@ public class TestMailboxExecutor implements MailboxExecutor {
 	}
 
 	@Override
-	public boolean tryExecute(Runnable command) {
-		execute(command);
-		return true;
-	}
-
-	@Override
 	public void yield() throws InterruptedException {
 		synchronized (lock) {
 			lock.wait(1);