You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2019/05/22 08:32:34 UTC

[flink] branch master updated (3813bb9 -> ead9139)

This is an automated email from the ASF dual-hosted git repository.

srichter pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 3813bb9  [FLINK-12241][hive] Support Flink functions in catalog function APIs of HiveCatalog
     new 022f6cc  [FLINK-12478, FLINK-12480][runtime] Introduce mailbox to StreamTask main-loop.
     new ead9139  [FLINK-12483][runtime] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/tasks/OneInputStreamTask.java          |  12 +-
 .../streaming/runtime/tasks/SourceStreamTask.java  |  71 ++++++-
 .../runtime/tasks/StreamIterationHead.java         | 105 ++++-----
 .../flink/streaming/runtime/tasks/StreamTask.java  |  82 ++++++-
 .../runtime/tasks/TwoInputStreamTask.java          |  13 +-
 .../streaming/runtime/tasks/mailbox/Mailbox.java   |  23 +-
 .../runtime/tasks/mailbox/MailboxImpl.java         | 236 +++++++++++++++++++++
 .../runtime/tasks/mailbox/MailboxReceiver.java     |  59 ++++++
 .../runtime/tasks/mailbox/MailboxSender.java       |  52 +++++
 ...heckpointExceptionHandlerConfigurationTest.java |   4 +-
 .../tasks/StreamTaskCancellationBarrierTest.java   |   4 +-
 .../runtime/tasks/StreamTaskTerminationTest.java   |   3 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    |  21 +-
 .../runtime/tasks/SynchronousCheckpointITCase.java |   3 +-
 .../runtime/tasks/SynchronousCheckpointTest.java   |   3 +-
 .../tasks/TaskCheckpointingBehaviourTest.java      |   4 +-
 .../runtime/tasks/mailbox/MailboxImplTest.java     | 170 +++++++++++++++
 .../flink/streaming/util/MockStreamTask.java       |   4 +-
 .../jobmaster/JobMasterStopWithSavepointIT.java    |  10 +-
 19 files changed, 762 insertions(+), 117 deletions(-)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoWriter.java => flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java (60%)
 create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
 create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java
 create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
 create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java


[flink] 01/02: [FLINK-12478, FLINK-12480][runtime] Introduce mailbox to StreamTask main-loop.

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 022f6cceef65859bc6f172151d09140038297f69
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Fri May 10 11:20:02 2019 +0200

    [FLINK-12478, FLINK-12480][runtime] Introduce mailbox to StreamTask main-loop.
    
    This closes #8409.
    This closes #8431.
    
    This also decomposes monolithic run-loops in StreamTask implementations into step-wise calls.
---
 .../runtime/tasks/OneInputStreamTask.java          |  12 +-
 .../streaming/runtime/tasks/SourceStreamTask.java  |   5 +-
 .../runtime/tasks/StreamIterationHead.java         | 105 ++++-----
 .../flink/streaming/runtime/tasks/StreamTask.java  |  82 ++++++-
 .../runtime/tasks/TwoInputStreamTask.java          |  13 +-
 .../streaming/runtime/tasks/mailbox/Mailbox.java   |  36 ++++
 .../runtime/tasks/mailbox/MailboxImpl.java         | 236 +++++++++++++++++++++
 .../runtime/tasks/mailbox/MailboxReceiver.java     |  59 ++++++
 .../runtime/tasks/mailbox/MailboxSender.java       |  52 +++++
 ...heckpointExceptionHandlerConfigurationTest.java |   4 +-
 .../tasks/StreamTaskCancellationBarrierTest.java   |   4 +-
 .../runtime/tasks/StreamTaskTerminationTest.java   |   3 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    |  21 +-
 .../runtime/tasks/SynchronousCheckpointITCase.java |   3 +-
 .../runtime/tasks/SynchronousCheckpointTest.java   |   3 +-
 .../tasks/TaskCheckpointingBehaviourTest.java      |   4 +-
 .../runtime/tasks/mailbox/MailboxImplTest.java     | 170 +++++++++++++++
 .../flink/streaming/util/MockStreamTask.java       |   4 +-
 .../jobmaster/JobMasterStopWithSavepointIT.java    |  10 +-
 19 files changed, 726 insertions(+), 100 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 7498518..7b82d8f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -39,8 +39,6 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 
 	private StreamInputProcessor<IN> inputProcessor;
 
-	private volatile boolean running = true;
-
 	private final WatermarkGauge inputWatermarkGauge = new WatermarkGauge();
 
 	/**
@@ -98,12 +96,9 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 	}
 
 	@Override
-	protected void run() throws Exception {
-		// cache processor reference on the stack, to make the code more JIT friendly
-		final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
-
-		while (running && inputProcessor.processInput()) {
-			// all the work happens in the "processInput" method
+	protected void performDefaultAction(ActionContext context) throws Exception {
+		if (!inputProcessor.processInput()) {
+			context.allActionsCompleted();
 		}
 	}
 
@@ -116,6 +111,5 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 
 	@Override
 	protected void cancelTask() {
-		running = false;
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 1a1c529..fd50a1a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -98,8 +98,11 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 	}
 
 	@Override
-	protected void run() throws Exception {
+	protected void performDefaultAction(ActionContext context) throws Exception {
+		// Against the usual contract of this method, this implementation is not step-wise but blocking instead for
+		// compatibility reasons with the current source interface (source functions run as a loop, not in steps).
 		headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
+		context.allActionsCompleted();
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index ecef7f0..d25bd23 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,88 +42,72 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class);
 
-	private volatile boolean running = true;
+	private RecordWriterOutput<OUT>[] streamOutputs;
+
+	private final BlockingQueue<StreamRecord<OUT>> dataChannel;
+	private final String brokerID;
+	private final long iterationWaitTime;
+	private final boolean shouldWait;
 
 	public StreamIterationHead(Environment env) {
 		super(env);
+		final String iterationId = getConfiguration().getIterationId();
+		if (iterationId == null || iterationId.length() == 0) {
+			throw new FlinkRuntimeException("Missing iteration ID in the task configuration");
+		}
+
+		this.dataChannel = new ArrayBlockingQueue<>(1);
+		this.brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId,
+			getEnvironment().getTaskInfo().getIndexOfThisSubtask());
+		this.iterationWaitTime = getConfiguration().getIterationWaitTime();
+		this.shouldWait = iterationWaitTime > 0;
 	}
 
 	// ------------------------------------------------------------------------
 
 	@Override
-	protected void run() throws Exception {
-
-		final String iterationId = getConfiguration().getIterationId();
-		if (iterationId == null || iterationId.length() == 0) {
-			throw new Exception("Missing iteration ID in the task configuration");
+	protected void performDefaultAction(ActionContext context) throws Exception {
+		StreamRecord<OUT> nextRecord = shouldWait ?
+			dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS) :
+			dataChannel.take();
+
+		if (nextRecord != null) {
+			synchronized (getCheckpointLock()) {
+				for (RecordWriterOutput<OUT> output : streamOutputs) {
+					output.collect(nextRecord);
+				}
+			}
+		} else {
+			context.allActionsCompleted();
 		}
+	}
 
-		final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId ,
-				getEnvironment().getTaskInfo().getIndexOfThisSubtask());
-
-		final long iterationWaitTime = getConfiguration().getIterationWaitTime();
-		final boolean shouldWait = iterationWaitTime > 0;
-
-		final BlockingQueue<StreamRecord<OUT>> dataChannel = new ArrayBlockingQueue<StreamRecord<OUT>>(1);
+	// ------------------------------------------------------------------------
 
+	@SuppressWarnings("unchecked")
+	@Override
+	public void init() {
 		// offer the queue for the tail
 		BlockingQueueBroker.INSTANCE.handIn(brokerID, dataChannel);
 		LOG.info("Iteration head {} added feedback queue under {}", getName(), brokerID);
 
-		// do the work
-		try {
-			@SuppressWarnings("unchecked")
-			RecordWriterOutput<OUT>[] outputs = (RecordWriterOutput<OUT>[]) getStreamOutputs();
-
-			// If timestamps are enabled we make sure to remove cyclic watermark dependencies
-			if (isSerializingTimestamps()) {
-				synchronized (getCheckpointLock()) {
-					for (RecordWriterOutput<OUT> output : outputs) {
-						output.emitWatermark(new Watermark(Long.MAX_VALUE));
-					}
-				}
-			}
+		this.streamOutputs = (RecordWriterOutput<OUT>[]) getStreamOutputs();
 
-			while (running) {
-				StreamRecord<OUT> nextRecord = shouldWait ?
-					dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS) :
-					dataChannel.take();
-
-				if (nextRecord != null) {
-					synchronized (getCheckpointLock()) {
-						for (RecordWriterOutput<OUT> output : outputs) {
-							output.collect(nextRecord);
-						}
-					}
-				}
-				else {
-					// done
-					break;
+		// If timestamps are enabled we make sure to remove cyclic watermark dependencies
+		if (isSerializingTimestamps()) {
+			synchronized (getCheckpointLock()) {
+				for (RecordWriterOutput<OUT> output : streamOutputs) {
+					output.emitWatermark(new Watermark(Long.MAX_VALUE));
 				}
 			}
 		}
-		finally {
-			// make sure that we remove the queue from the broker, to prevent a resource leak
-			BlockingQueueBroker.INSTANCE.remove(brokerID);
-			LOG.info("Iteration head {} removed feedback queue under {}", getName(), brokerID);
-		}
-	}
-
-	@Override
-	protected void cancelTask() {
-		running = false;
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void init() {
-		// does not hold any resources, no initialization necessary
 	}
 
 	@Override
-	protected void cleanup() throws Exception {
-		// does not hold any resources, no cleanup necessary
+	protected void cleanup() {
+		// make sure that we remove the queue from the broker, to prevent a resource leak
+		BlockingQueueBroker.INSTANCE.remove(brokerID);
+		LOG.info("Iteration head {} removed feedback queue under {}", getName(), brokerID);
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 8a3d006..2df565d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -55,8 +55,11 @@ import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitio
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.mailbox.Mailbox;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxImpl;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,6 +71,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -124,6 +128,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	/** The logger used by the StreamTask and its subclasses. */
 	private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
 
+	/** Special value, letter that terminates the mailbox loop. */
+	private static final Runnable POISON_LETTER = () -> {};
+
+	/** Special value, letter that "wakes up" a waiting mailbox loop. */
+	private static final Runnable DEFAULT_ACTION_AVAILABLE = () -> {};
+
 	// ------------------------------------------------------------------------
 
 	/**
@@ -182,6 +192,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 	private final SynchronousSavepointLatch syncSavepointLatch;
 
+	protected final Mailbox mailbox;
+
 	// ------------------------------------------------------------------------
 
 	/**
@@ -214,6 +226,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		this.accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap();
 		this.recordWriters = createRecordWriters(configuration, environment);
 		this.syncSavepointLatch = new SynchronousSavepointLatch();
+		this.mailbox = new MailboxImpl();
 	}
 
 	// ------------------------------------------------------------------------
@@ -222,13 +235,41 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 	protected abstract void init() throws Exception;
 
-	protected abstract void run() throws Exception;
-
 	protected abstract void cleanup() throws Exception;
 
 	protected abstract void cancelTask() throws Exception;
 
 	/**
+	 * This method implements the default action of the task (e.g. processing one event from the input). Implementations
+	 * should (in general) be non-blocking.
+	 *
+	 * @param context context object for collaborative interaction between the action and the stream task.
+	 * @throws Exception on any problems in the action.
+	 */
+	protected abstract void performDefaultAction(ActionContext context) throws Exception;
+
+	/**
+	 * Runs the stream-tasks main processing loop.
+	 */
+	private void run() throws Exception {
+		final ActionContext actionContext = new ActionContext();
+		while (true) {
+			if (mailbox.hasMail()) {
+				Optional<Runnable> maybeLetter;
+				while ((maybeLetter = mailbox.tryTakeMail()).isPresent()) {
+					Runnable letter = maybeLetter.get();
+					if (letter == POISON_LETTER) {
+						return;
+					}
+					letter.run();
+				}
+			}
+
+			performDefaultAction(actionContext);
+		}
+	}
+
+	/**
 	 * Emits the {@link org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK MAX_WATERMARK}
 	 * so that all registered timers are fired.
 	 *
@@ -426,6 +467,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 	@Override
 	public final void cancel() throws Exception {
+		mailbox.clearAndPut(POISON_LETTER);
 		isRunning = false;
 		canceled = true;
 
@@ -1280,4 +1322,40 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
 		return output;
 	}
+
+	/**
+	 * The action context is passed as parameter into the default action method and holds control methods for feedback
+	 * of from the default action to the mailbox.
+	 */
+	public final class ActionContext {
+
+		private final Runnable actionUnavailableLetter = ThrowingRunnable.unchecked(mailbox::waitUntilHasMail);
+
+		/**
+		 * This method must be called to end the stream task when all actions for the tasks have been performed.
+		 */
+		public void allActionsCompleted() {
+			mailbox.clearAndPut(POISON_LETTER);
+		}
+
+		/**
+		 * Calling this method signals that the mailbox-thread should continue invoking the default action, e.g. because
+		 * new input became available for processing.
+		 *
+		 * @throws InterruptedException on interruption.
+		 */
+		public void actionsAvailable() throws InterruptedException {
+			mailbox.putMail(DEFAULT_ACTION_AVAILABLE);
+		}
+
+		/**
+		 * Calling this method signals that the mailbox-thread should (temporarily) stop invoking the default action,
+		 * e.g. because there is currently no input available.
+		 *
+		 * @throws InterruptedException on interruption.
+		 */
+		public void actionsUnavailable() throws InterruptedException {
+			mailbox.putMail(actionUnavailableLetter);
+		}
+	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 546ccdb..934f2cb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -40,8 +40,6 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 
 	private StreamTwoInputProcessor<IN1, IN2> inputProcessor;
 
-	private volatile boolean running = true;
-
 	private final WatermarkGauge input1WatermarkGauge;
 	private final WatermarkGauge input2WatermarkGauge;
 	private final MinWatermarkGauge minInputWatermarkGauge;
@@ -110,12 +108,9 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 	}
 
 	@Override
-	protected void run() throws Exception {
-		// cache processor reference on the stack, to make the code more JIT friendly
-		final StreamTwoInputProcessor<IN1, IN2> inputProcessor = this.inputProcessor;
-
-		while (running && inputProcessor.processInput()) {
-			// all the work happens in the "processInput" method
+	protected void performDefaultAction(ActionContext context) throws Exception {
+		if (!inputProcessor.processInput()) {
+			context.allActionsCompleted();
 		}
 	}
 
@@ -128,6 +123,6 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 
 	@Override
 	protected void cancelTask() {
-		running = false;
+
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
new file mode 100644
index 0000000..dfa8d76
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A mailbox is basically a blocking queue for inter-thread message exchange in form of {@link Runnable} objects between
+ * multiple producer threads and a single consumer.
+ */
+public interface Mailbox extends MailboxReceiver, MailboxSender {
+
+	/**
+	 * The effect of this is that all pending letters are dropped and the given priorityAction
+	 * is enqueued to the head of the mailbox.
+	 *
+	 * @param priorityAction action to enqueue atomically after the mailbox was cleared.
+	 */
+	void clearAndPut(@Nonnull Runnable priorityAction);
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
new file mode 100644
index 0000000..411efd1
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Optional;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Implementation of {@link Mailbox} inspired by {@link java.util.concurrent.ArrayBlockingQueue} and tailored towards
+ * our use case with multiple writers, single reader and volatile reads instead of lock & read on {@link #count}.
+ */
+@ThreadSafe
+public class MailboxImpl implements Mailbox {
+
+	/**
+	 * The enqueued letters.
+	 */
+	@GuardedBy("lock")
+	private final Runnable[] ringBuffer;
+
+	/**
+	 * Lock for all concurrent ops.
+	 */
+	private final ReentrantLock lock;
+
+	/**
+	 * Condition that is triggered when the buffer is no longer empty.
+	 */
+	@GuardedBy("lock")
+	private final Condition notEmpty;
+
+	/**
+	 * Condition that is triggered when the buffer is no longer full.
+	 */
+	@GuardedBy("lock")
+	private final Condition notFull;
+
+	/**
+	 * Index of the ring buffer head.
+	 */
+	@GuardedBy("lock")
+	private int headIndex;
+
+	/**
+	 * Index of the ring buffer tail.
+	 */
+	@GuardedBy("lock")
+	private int tailIndex;
+
+	/**
+	 * Number of letters in the mailbox.
+	 */
+	@GuardedBy("lock")
+	private volatile int count;
+
+	/**
+	 * A mask to wrap around the indexes of the ring buffer. We use this to avoid ifs or modulo ops.
+	 */
+	private final int moduloMask;
+
+	public MailboxImpl() {
+		this(6); // 2^6 = 64
+	}
+
+	public MailboxImpl(int capacityPow2) {
+		final int capacity = 1 << capacityPow2;
+		Preconditions.checkState(capacity > 0);
+		this.moduloMask = capacity - 1;
+		this.ringBuffer = new Runnable[capacity];
+		this.lock = new ReentrantLock();
+		this.notEmpty = lock.newCondition();
+		this.notFull = lock.newCondition();
+	}
+
+	@Override
+	public boolean hasMail() {
+		return !isEmpty();
+	}
+
+	@Override
+	public Optional<Runnable> tryTakeMail() {
+		final ReentrantLock lock = this.lock;
+		lock.lock();
+		try {
+			return isEmpty() ? Optional.empty() : Optional.of(takeInternal());
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	@Nonnull
+	@Override
+	public Runnable takeMail() throws InterruptedException {
+		final ReentrantLock lock = this.lock;
+		lock.lockInterruptibly();
+		try {
+			while (isEmpty()) {
+				notEmpty.await();
+			}
+			return takeInternal();
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	@Override
+	public void waitUntilHasMail() throws InterruptedException {
+		final ReentrantLock lock = this.lock;
+		lock.lockInterruptibly();
+		try {
+			while (isEmpty()) {
+				notEmpty.await();
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	//------------------------------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean tryPutMail(@Nonnull Runnable letter) {
+		final ReentrantLock lock = this.lock;
+		lock.lock();
+		try {
+			if (isFull()) {
+				return false;
+			} else {
+				putInternal(letter);
+				return true;
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	@Override
+	public void putMail(@Nonnull Runnable letter) throws InterruptedException {
+		final ReentrantLock lock = this.lock;
+		lock.lockInterruptibly();
+		try {
+			while (isFull()) {
+				notFull.await();
+			}
+			putInternal(letter);
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	@Override
+	public void waitUntilHasCapacity() throws InterruptedException {
+		final ReentrantLock lock = this.lock;
+		lock.lockInterruptibly();
+		try {
+			while (isFull()) {
+				notFull.await();
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	//------------------------------------------------------------------------------------------------------------------
+
+	private void putInternal(Runnable letter) {
+		assert lock.isHeldByCurrentThread();
+		this.ringBuffer[tailIndex] = letter;
+		tailIndex = increaseIndexWithWrapAround(tailIndex);
+		++count;
+		notEmpty.signal();
+	}
+
+	private Runnable takeInternal() {
+		assert lock.isHeldByCurrentThread();
+		final Runnable[] buffer = this.ringBuffer;
+		Runnable letter = buffer[headIndex];
+		buffer[headIndex] = null;
+		headIndex = increaseIndexWithWrapAround(headIndex);
+		--count;
+		notFull.signal();
+		return letter;
+	}
+
+	private int increaseIndexWithWrapAround(int old) {
+		return (old + 1) & moduloMask;
+	}
+
+	private boolean isFull() {
+		return count >= ringBuffer.length;
+	}
+
+	private boolean isEmpty() {
+		return count == 0;
+	}
+
+	@Override
+	public void clearAndPut(@Nonnull Runnable shutdownAction) {
+		lock.lock();
+		try {
+			int localCount = count;
+			while (localCount > 0) {
+				ringBuffer[headIndex] = null;
+				headIndex = increaseIndexWithWrapAround(headIndex);
+				--localCount;
+			}
+			count = 0;
+			putInternal(shutdownAction);
+		} finally {
+			lock.unlock();
+		}
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java
new file mode 100644
index 0000000..189687e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import javax.annotation.Nonnull;
+
+import java.util.Optional;
+
+/**
+ * Consumer-facing side of the {@link Mailbox} interface. This is used to dequeue letters. The mailbox returns letters
+ * in the order by which they were enqueued. A mailbox should only be consumed by one thread at a time.
+ */
+public interface MailboxReceiver {
+
+	/**
+	 * Returns <code>true</code> if the mailbox contains mail.
+	 */
+	boolean hasMail();
+
+	/**
+	 * Returns an optional with either the oldest letter from the mailbox (head of queue) if the mailbox is not empty or
+	 * an empty optional otherwise.
+	 *
+	 * @return an optional with either the oldest letter from the mailbox (head of queue) if the mailbox is not empty or
+	 * an empty optional otherwise.
+	 */
+	Optional<Runnable> tryTakeMail();
+
+	/**
+	 * This method returns the oldest letter from the mailbox (head of queue) or blocks until a letter is available.
+	 *
+	 * @return the oldest letter from the mailbox (head of queue).
+	 * @throws InterruptedException on interruption.
+	 */
+	@Nonnull
+	Runnable takeMail() throws InterruptedException;
+
+	/**
+	 * This method blocks if the mailbox is empty until mail becomes available.
+	 * @throws InterruptedException on interruption.
+	 */
+	void waitUntilHasMail() throws InterruptedException;
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
new file mode 100644
index 0000000..1829125
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Producer-facing side of the {@link Mailbox} interface. This is used to enqueue letters. Multiple producers threads
+ * can put to the same mailbox.
+ */
+public interface MailboxSender {
+
+	/**
+	 * Enqueues the given letter to the mailbox, if capacity is available. On success, this returns <code>true</code>
+	 * and <code>false</code> if the mailbox was already full.
+	 *
+	 * @param letter the letter to enqueue.
+	 * @return <code>true</code> iff successful.
+	 */
+	boolean tryPutMail(@Nonnull Runnable letter);
+
+	/**
+	 * Enqueues the given letter to the mailbox and blocks until there is capacity for a successful put.
+	 *
+	 * @param letter the letter to enqueue.
+	 * @throws InterruptedException on interruption.
+	 */
+	void putMail(@Nonnull Runnable letter) throws InterruptedException;
+
+	/**
+	 * This method blocks until the mailbox has again capacity to enqueue new letters.
+	 *
+	 * @throws InterruptedException on interruption.
+	 */
+	void waitUntilHasCapacity() throws InterruptedException;
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
index 08cee55..17ab88f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
@@ -80,7 +80,9 @@ public class CheckpointExceptionHandlerConfigurationTest extends TestLogger {
 			protected void init() throws Exception {}
 
 			@Override
-			protected void run() throws Exception {}
+			protected void performDefaultAction(ActionContext context) throws Exception {
+				context.allActionsCompleted();
+			}
 
 			@Override
 			protected void cleanup() throws Exception {}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index 80a38e4..d1b3697 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -188,7 +188,9 @@ public class StreamTaskCancellationBarrierTest {
 		}
 
 		@Override
-		protected void run() throws Exception {}
+		protected void performDefaultAction(ActionContext context) throws Exception {
+			context.allActionsCompleted();
+		}
 
 		@Override
 		protected void cleanup() throws Exception {}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 8918b0a..c079d15 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -225,10 +225,11 @@ public class StreamTaskTerminationTest extends TestLogger {
 		}
 
 		@Override
-		protected void run() throws Exception {
+		protected void performDefaultAction(ActionContext context) throws Exception {
 			RUN_LATCH.trigger();
 			// wait until we have started an asynchronous checkpoint
 			CHECKPOINTING_LATCH.await();
+			context.allActionsCompleted();
 		}
 
 		@Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index e24949d..af779b6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -844,7 +844,9 @@ public class StreamTaskTest extends TestLogger {
 		protected void init() throws Exception {}
 
 		@Override
-		protected void run() throws Exception {}
+		protected void performDefaultAction(ActionContext context) throws Exception {
+			context.allActionsCompleted();
+		}
 
 		@Override
 		protected void cleanup() throws Exception {}
@@ -1031,7 +1033,9 @@ public class StreamTaskTest extends TestLogger {
 		protected void init() throws Exception {}
 
 		@Override
-		protected void run() throws Exception {}
+		protected void performDefaultAction(ActionContext context) throws Exception {
+			context.allActionsCompleted();
+		}
 
 		@Override
 		protected void cleanup() throws Exception {}
@@ -1059,10 +1063,11 @@ public class StreamTaskTest extends TestLogger {
 		}
 
 		@Override
-		protected void run() throws Exception {
+		protected void performDefaultAction(ActionContext context) throws Exception {
 			if (fail) {
 				throw new RuntimeException();
 			}
+			context.allActionsCompleted();
 		}
 
 		@Override
@@ -1149,7 +1154,7 @@ public class StreamTaskTest extends TestLogger {
 		protected void init() {}
 
 		@Override
-		protected void run() throws Exception {
+		protected void performDefaultAction(ActionContext context) throws Exception {
 			holder = new LockHolder(getCheckpointLock(), latch);
 			holder.start();
 			latch.await();
@@ -1164,6 +1169,7 @@ public class StreamTaskTest extends TestLogger {
 				// restore interruption state
 				Thread.currentThread().interrupt();
 			}
+			context.allActionsCompleted();
 		}
 
 		@Override
@@ -1193,7 +1199,7 @@ public class StreamTaskTest extends TestLogger {
 		protected void init() {}
 
 		@Override
-		protected void run() throws Exception {
+		protected void performDefaultAction(ActionContext context) throws Exception {
 			final OneShotLatch latch = new OneShotLatch();
 			final Object lock = new Object();
 
@@ -1219,7 +1225,7 @@ public class StreamTaskTest extends TestLogger {
 			finally {
 				holder.close();
 			}
-
+			context.allActionsCompleted();
 		}
 
 		@Override
@@ -1259,8 +1265,9 @@ public class StreamTaskTest extends TestLogger {
 		}
 
 		@Override
-		protected void run() throws Exception {
+		protected void performDefaultAction(ActionContext context) throws Exception {
 			syncLatch.await();
+			context.allActionsCompleted();
 		}
 
 		@Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
index 222133a..778a7d0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
@@ -156,9 +156,10 @@ public class SynchronousCheckpointITCase {
 		}
 
 		@Override
-		protected void run() throws Exception {
+		protected void performDefaultAction(ActionContext context) throws Exception {
 			executionLatch.trigger();
 			cancellationLatch.await();
+			context.allActionsCompleted();
 		}
 
 		@Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
index 2ad8c6f..f7e36d4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
@@ -174,9 +174,10 @@ public class SynchronousCheckpointTest {
 		protected void init() {}
 
 		@Override
-		protected void run() throws Exception {
+		protected void performDefaultAction(ActionContext context) throws Exception {
 			runningLatch.trigger();
 			execLatch.await();
+			context.allActionsCompleted();
 		}
 
 		@Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index 244c8aa..9418e14 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -506,8 +506,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
 		public void init() {}
 
 		@Override
-		protected void run() throws Exception {
-
+		protected void performDefaultAction(ActionContext context) throws Exception {
 			triggerCheckpointOnBarrier(
 				new CheckpointMetaData(
 					11L,
@@ -518,6 +517,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
 			while (isRunning()) {
 				Thread.sleep(1L);
 			}
+			context.allActionsCompleted();
 		}
 
 		@Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
new file mode 100644
index 0000000..fc7f19c
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import org.apache.flink.util.function.BiConsumerWithException;
+import org.apache.flink.util.function.FunctionWithException;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * Unit tests for {@link MailboxImpl}.
+ */
+public class MailboxImplTest {
+
+	private static final Runnable POISON_LETTER = () -> {};
+	private static final int CAPACITY_POW_2 = 1;
+	private static final int CAPACITY = 1 << CAPACITY_POW_2;
+
+	/**
+	 * Object under test.
+	 */
+	private Mailbox mailbox;
+
+	@Before
+	public void setUp() throws Exception {
+		mailbox = new MailboxImpl(CAPACITY_POW_2);
+	}
+
+	/**
+	 * Test for #clearAndPut should remove other pending events and enqueue directly to the head of the mailbox queue.
+	 */
+	@Test
+	public void testClearAndPut() {
+		for (int i = 0; i < CAPACITY; ++i) {
+			Assert.assertTrue(mailbox.tryPutMail(() -> {}));
+		}
+
+		mailbox.clearAndPut(POISON_LETTER);
+
+		Assert.assertTrue(mailbox.hasMail());
+		Assert.assertEquals(POISON_LETTER, mailbox.tryTakeMail().get());
+		Assert.assertFalse(mailbox.hasMail());
+	}
+
+	@Test
+	public void testContracts() throws Exception {
+		final Queue<Runnable> testObjects = new LinkedList<>();
+		Assert.assertFalse(mailbox.hasMail());
+
+		for (int i = 0; i < CAPACITY; ++i) {
+			Runnable letter = () -> {};
+			testObjects.add(letter);
+			Assert.assertTrue(mailbox.tryPutMail(letter));
+			Assert.assertTrue(mailbox.hasMail());
+		}
+
+		Assert.assertFalse(mailbox.tryPutMail(() -> {}));
+
+		while (!testObjects.isEmpty()) {
+			Assert.assertEquals(testObjects.remove(), mailbox.tryTakeMail().get());
+			Assert.assertEquals(!testObjects.isEmpty(), mailbox.hasMail());
+			mailbox.waitUntilHasCapacity(); // should not block here because the mailbox is not full
+		}
+
+		Thread waitingReader = new Thread(ThrowingRunnable.unchecked(() -> mailbox.waitUntilHasMail()));
+		waitingReader.start();
+		Thread.sleep(1);
+		Assert.assertTrue(waitingReader.isAlive());
+		mailbox.tryPutMail(() -> {});
+		waitingReader.join(); // should complete here
+
+		while (mailbox.tryPutMail(() -> {})) {}
+
+		Thread waitingWriter = new Thread(ThrowingRunnable.unchecked(() -> mailbox.waitUntilHasCapacity()));
+		waitingWriter.start();
+		Thread.sleep(1);
+		Assert.assertTrue(waitingWriter.isAlive());
+		mailbox.takeMail();
+		waitingWriter.join();
+	}
+
+	/**
+	 * Test the producer-consumer pattern using the blocking methods on the mailbox.
+	 */
+	@Test
+	public void testConcurrentPutTakeBlocking() throws Exception {
+		testPutTake(MailboxReceiver::takeMail, MailboxSender::putMail);
+	}
+
+	/**
+	 * Test the producer-consumer pattern using the non-blocking methods & waits on the mailbox.
+	 */
+	@Test
+	public void testConcurrentPutTakeNonBlockingAndWait() throws Exception {
+		testPutTake((mailbox -> {
+				mailbox.waitUntilHasMail();
+				return mailbox.tryTakeMail().get();
+			}),
+			((mailbox, runnable) -> {
+				while (!mailbox.tryPutMail(runnable)) {
+					mailbox.waitUntilHasCapacity();
+				}
+			}));
+	}
+
+	/**
+	 * Test producer-consumer pattern through the mailbox in a concurrent setting (n-writer / 1-reader).
+	 */
+	private void testPutTake(
+		FunctionWithException<Mailbox, Runnable, Exception> takeMethod,
+		BiConsumerWithException<Mailbox, Runnable, Exception> putMethod) throws Exception {
+		final int numThreads = 10;
+		final int numLettersPerThread = 1000;
+		final int[] results = new int[numThreads];
+		Thread[] writerThreads = new Thread[numThreads];
+		Thread readerThread = new Thread(ThrowingRunnable.unchecked(() -> {
+			Runnable letter;
+			while ((letter = takeMethod.apply(mailbox)) != POISON_LETTER) {
+				letter.run();
+			}
+		}));
+
+		readerThread.start();
+		for (int i = 0; i < writerThreads.length; ++i) {
+			final int threadId = i;
+			writerThreads[i] = new Thread(ThrowingRunnable.unchecked(() -> {
+				for (int k = 0; k < numLettersPerThread; ++k) {
+					putMethod.accept(mailbox, () -> ++results[threadId]);
+				}
+			}));
+		}
+
+		for (Thread writerThread : writerThreads) {
+			writerThread.start();
+		}
+
+		for (Thread writerThread : writerThreads) {
+			writerThread.join();
+		}
+
+		mailbox.putMail(POISON_LETTER);
+
+		readerThread.join();
+		for (int perThreadResult : results) {
+			Assert.assertEquals(numLettersPerThread, perThreadResult);
+		}
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
index 230c68a..835d924 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
@@ -81,7 +81,9 @@ public class MockStreamTask extends StreamTask {
 	public void init() { }
 
 	@Override
-	protected void run() { }
+	protected void performDefaultAction(ActionContext context) throws Exception {
+		context.allActionsCompleted();
+	}
 
 	@Override
 	protected void cleanup() { }
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
index 986e410..57a5121 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
@@ -284,13 +284,14 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase {
 		}
 
 		@Override
-		protected void run() throws InterruptedException {
+		protected void performDefaultAction(ActionContext context) throws Exception {
 			final long taskIndex = getEnvironment().getTaskInfo().getIndexOfThisSubtask();
 			if (taskIndex == 0) {
 				numberOfRestarts.countDown();
 			}
 			invokeLatch.countDown();
 			finishLatch.await();
+			context.allActionsCompleted();
 		}
 
 		@Override
@@ -340,9 +341,10 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase {
 		}
 
 		@Override
-		protected void run() throws InterruptedException {
+		protected void performDefaultAction(ActionContext context) throws Exception {
 			invokeLatch.countDown();
 			finishLatch.await();
+			context.allActionsCompleted();
 		}
 
 		@Override
@@ -368,8 +370,8 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase {
 		}
 
 		@Override
-		protected void run() throws Exception {
-
+		protected void performDefaultAction(ActionContext context) throws Exception {
+			context.allActionsCompleted();
 		}
 
 		@Override


[flink] 02/02: [FLINK-12483][runtime] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks.

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ead9139680ea82c4fdfd1e9d19baf4d4a08ec845
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Tue May 14 15:33:48 2019 +0200

    [FLINK-12483][runtime] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks.
    
    This closes #8442.
---
 .../streaming/runtime/tasks/SourceStreamTask.java  | 66 +++++++++++++++++++++-
 1 file changed, 64 insertions(+), 2 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index fd50a1a..e604f2c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -45,6 +45,8 @@ import org.apache.flink.util.FlinkException;
 public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>>
 	extends StreamTask<OUT, OP> {
 
+	private static final Runnable SOURCE_POISON_LETTER = () -> {};
+
 	private volatile boolean externallyInducedCheckpoints;
 
 	public SourceStreamTask(Environment env) {
@@ -101,12 +103,43 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 	protected void performDefaultAction(ActionContext context) throws Exception {
 		// Against the usual contract of this method, this implementation is not step-wise but blocking instead for
 		// compatibility reasons with the current source interface (source functions run as a loop, not in steps).
-		headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
+		final LegacySourceFunctionThread sourceThread = new LegacySourceFunctionThread();
+		sourceThread.start();
+
+		// We run an alternative mailbox loop that does not involve default actions and synchronizes around actions.
+		try {
+			runAlternativeMailboxLoop();
+		} catch (Exception mailboxEx) {
+			// We cancel the source function if some runtime exception escaped the mailbox.
+			if (!isCanceled()) {
+				cancelTask();
+			}
+			throw mailboxEx;
+		}
+
+		sourceThread.join();
+		sourceThread.checkThrowSourceExecutionException();
+
 		context.allActionsCompleted();
 	}
 
+	private void runAlternativeMailboxLoop() throws InterruptedException {
+
+		while (true) {
+
+			Runnable letter = mailbox.takeMail();
+			if (letter == SOURCE_POISON_LETTER) {
+				break;
+			}
+
+			synchronized (getCheckpointLock()) {
+				letter.run();
+			}
+		}
+	}
+
 	@Override
-	protected void cancelTask() throws Exception {
+	protected void cancelTask() {
 		if (headOperator != null) {
 			headOperator.cancel();
 		}
@@ -133,4 +166,33 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 			}
 		}
 	}
+
+	/**
+	 * Runnable that executes the the source function in the head operator.
+	 */
+	private class LegacySourceFunctionThread extends Thread {
+
+		private Throwable sourceExecutionThrowable;
+
+		LegacySourceFunctionThread() {
+			this.sourceExecutionThrowable = null;
+		}
+
+		@Override
+		public void run() {
+			try {
+				headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
+			} catch (Throwable t) {
+				sourceExecutionThrowable = t;
+			} finally {
+				mailbox.clearAndPut(SOURCE_POISON_LETTER);
+			}
+		}
+
+		void checkThrowSourceExecutionException() throws Exception {
+			if (sourceExecutionThrowable != null) {
+				throw new Exception(sourceExecutionThrowable);
+			}
+		}
+	}
 }