You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/08/08 08:31:32 UTC

[flink] branch master updated (2979a31 -> 434f59f)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 2979a31  [FLINK-13594][python] Improve the 'from_element' method of flink python api to apply to blink planner.
     new 04b5cbf  [FLINK-12479][operators] Integrate StreamInputProcessor with mailbox
     new 434f59f  [hotfix][task,test] Do not override performDefaultAction in StreamTaskTest

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../streaming/runtime/io/StreamInputProcessor.java |   7 +-
 .../runtime/io/StreamOneInputProcessor.java        |  43 ++++---
 .../runtime/io/StreamTwoInputProcessor.java        |  21 +++-
 .../io/StreamTwoInputSelectableProcessor.java      | 125 ++++++++++-----------
 .../flink/streaming/runtime/tasks/StreamTask.java  |   9 +-
 .../tasks/StreamTaskCancellationBarrierTest.java   |   1 +
 .../tasks/StreamTaskSelectiveReadingTest.java      |  10 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    |  52 ++++++---
 8 files changed, 162 insertions(+), 106 deletions(-)


[flink] 01/02: [FLINK-12479][operators] Integrate StreamInputProcessor with mailbox

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 04b5cbf07775c086b1df33f94b77a99acc3f4615
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Jun 21 11:57:24 2019 +0200

    [FLINK-12479][operators] Integrate StreamInputProcessor with mailbox
---
 .../streaming/runtime/io/StreamInputProcessor.java |   7 +-
 .../runtime/io/StreamOneInputProcessor.java        |  43 ++++---
 .../runtime/io/StreamTwoInputProcessor.java        |  21 +++-
 .../io/StreamTwoInputSelectableProcessor.java      | 125 ++++++++++-----------
 .../flink/streaming/runtime/tasks/StreamTask.java  |   9 +-
 .../tasks/StreamTaskSelectiveReadingTest.java      |  10 +-
 6 files changed, 122 insertions(+), 93 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 0b263d0..1de31bf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.AvailabilityListener;
 
 import java.io.Closeable;
 
@@ -26,9 +27,11 @@ import java.io.Closeable;
  * Interface for processing records by {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
  */
 @Internal
-public interface StreamInputProcessor extends Closeable {
+public interface StreamInputProcessor extends AvailabilityListener, Closeable {
 	/**
-	 * @return true if {@link StreamTaskInput} is finished.
+	 * @return true if {@link StreamInputProcessor} estimates that more records can be processed
+	 * immediately. Otherwise false, which means that there are no more records available at the
+	 * moment and the caller should check {@link #isFinished()} and/or {@link #isAvailable()}.
 	 */
 	boolean processInput() throws Exception;
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java
index 4be6d54..3be72ee 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java
@@ -43,6 +43,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -129,29 +130,29 @@ public final class StreamOneInputProcessor<IN> implements StreamInputProcessor {
 	}
 
 	@Override
+	public boolean isFinished() {
+		return input.isFinished();
+	}
+
+	@Override
+	public CompletableFuture<?> isAvailable() {
+		return input.isAvailable();
+	}
+
+	@Override
 	public boolean processInput() throws Exception {
 		initializeNumRecordsIn();
 
 		StreamElement recordOrMark = input.pollNextNullable();
-		if (recordOrMark == null) {
-			input.isAvailable().get();
-			return !checkFinished();
-		}
-		int channel = input.getLastChannel();
-		checkState(channel != StreamTaskInput.UNSPECIFIED);
+		if (recordOrMark != null) {
+			int channel = input.getLastChannel();
+			checkState(channel != StreamTaskInput.UNSPECIFIED);
 
-		processElement(recordOrMark, channel);
-		return true;
-	}
-
-	private boolean checkFinished() throws Exception {
-		boolean isFinished = input.isFinished();
-		if (isFinished) {
-			synchronized (lock) {
-				operatorChain.endInput(1);
-			}
+			processElement(recordOrMark, channel);
 		}
-		return isFinished;
+		checkFinished();
+
+		return recordOrMark != null;
 	}
 
 	private void processElement(StreamElement recordOrMark, int channel) throws Exception {
@@ -180,6 +181,14 @@ public final class StreamOneInputProcessor<IN> implements StreamInputProcessor {
 		}
 	}
 
+	private void checkFinished() throws Exception {
+		if (input.isFinished()) {
+			synchronized (lock) {
+				operatorChain.endInput(1);
+			}
+		}
+	}
+
 	private void initializeNumRecordsIn() {
 		if (numRecordsIn == null) {
 			try {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 00bdab9..c888477 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -56,6 +56,7 @@ import java.io.IOException;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -207,6 +208,19 @@ public final class StreamTwoInputProcessor<IN1, IN2> implements StreamInputProce
 	}
 
 	@Override
+	public boolean isFinished() {
+		return isFinished;
+	}
+
+	@Override
+	public CompletableFuture<?> isAvailable() {
+		if (currentRecordDeserializer != null) {
+			return AVAILABLE;
+		}
+		return barrierHandler.isAvailable();
+	}
+
+	@Override
 	public boolean processInput() throws Exception {
 		if (isFinished) {
 			return false;
@@ -259,7 +273,6 @@ public final class StreamTwoInputProcessor<IN1, IN2> implements StreamInputProce
 								streamOperator.processElement1(record);
 							}
 							return true;
-
 						}
 					}
 					else {
@@ -295,15 +308,13 @@ public final class StreamTwoInputProcessor<IN1, IN2> implements StreamInputProce
 			if (bufferOrEvent.isPresent()) {
 				processBufferOrEvent(bufferOrEvent.get());
 			} else {
-				if (!barrierHandler.isFinished()) {
-					barrierHandler.isAvailable().get();
-				} else {
+				if (barrierHandler.isFinished()) {
 					isFinished = true;
 					if (!barrierHandler.isEmpty()) {
 						throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
 					}
-					return false;
 				}
+				return false;
 			}
 		}
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
index 65464db..4adc2dc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
@@ -47,7 +47,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -156,7 +155,21 @@ public final class StreamTwoInputSelectableProcessor<IN1, IN2> implements Stream
 		this.lastReadInputIndex = 1; // always try to read from the first input
 
 		this.isPrepared = false;
+	}
 
+	@Override
+	public boolean isFinished() {
+		return input1.isFinished() && input2.isFinished();
+	}
+
+	@Override
+	public CompletableFuture<?> isAvailable() {
+		if (inputSelection.isALLMaskOf2()) {
+			return isAnyInputAvailable();
+		} else {
+			StreamTaskInput input = (inputSelection.getInputMask() == InputSelection.FIRST.getInputMask()) ? input1 : input2;
+			return input.isAvailable();
+		}
 	}
 
 	@Override
@@ -179,18 +192,29 @@ public final class StreamTwoInputSelectableProcessor<IN1, IN2> implements Stream
 			if (recordOrMark != null) {
 				processElement1(recordOrMark, input1.getLastChannel());
 			}
+			checkFinished(input1, lastReadInputIndex);
 		} else {
 			recordOrMark = input2.pollNextNullable();
 			if (recordOrMark != null) {
 				processElement2(recordOrMark, input2.getLastChannel());
 			}
+			checkFinished(input2, lastReadInputIndex);
 		}
 
 		if (recordOrMark == null) {
 			setUnavailableInput(readingInputIndex);
 		}
 
-		return !checkFinished();
+		return recordOrMark != null;
+	}
+
+	private void checkFinished(StreamTaskInput input, int inputIndex) throws Exception {
+		if (input.isFinished()) {
+			synchronized (lock) {
+				operatorChain.endInput(getInputId(inputIndex));
+				inputSelection = inputSelector.nextSelection();
+			}
+		}
 	}
 
 	@Override
@@ -213,14 +237,12 @@ public final class StreamTwoInputSelectableProcessor<IN1, IN2> implements Stream
 		}
 	}
 
-	private int selectNextReadingInputIndex()
-		throws InterruptedException, ExecutionException, IOException {
-
-		int readingInputIndex;
-		while ((readingInputIndex = inputSelection.fairSelectNextIndexOutOf2(availableInputsMask, lastReadInputIndex)) == -1) {
-			if (!waitForAvailableInput(inputSelection)) {
-				return -1;
-			}
+	private int selectNextReadingInputIndex() throws IOException {
+		updateAvailability();
+		checkInputSelectionAgainstIsFinished();
+		int readingInputIndex = inputSelection.fairSelectNextIndexOutOf2(availableInputsMask, lastReadInputIndex);
+		if (readingInputIndex == -1) {
+			return -1;
 		}
 
 		// to avoid starvation, if the input selection is ALL and availableInputsMask is not ALL,
@@ -234,6 +256,27 @@ public final class StreamTwoInputSelectableProcessor<IN1, IN2> implements Stream
 		return readingInputIndex;
 	}
 
+	private void checkInputSelectionAgainstIsFinished() throws IOException {
+		if (inputSelection.isALLMaskOf2()) {
+			return;
+		}
+		if (inputSelection.isInputSelected(1) && input1.isFinished()) {
+			throw new IOException("Can not make a progress: only first input is selected but it is already finished");
+		}
+		if (inputSelection.isInputSelected(2) && input2.isFinished()) {
+			throw new IOException("Can not make a progress: only second input is selected but it is already finished");
+		}
+	}
+
+	private void updateAvailability() {
+		if (!input1.isFinished() && input1.isAvailable() == AVAILABLE) {
+			setAvailableInput(input1.getInputIndex());
+		}
+		if (!input2.isFinished() && input2.isAvailable() == AVAILABLE) {
+			setAvailableInput(input2.getInputIndex());
+		}
+	}
+
 	private void processElement1(StreamElement recordOrMark, int channel) throws Exception {
 		if (recordOrMark.isRecord()) {
 			StreamRecord<IN1> record = recordOrMark.asRecord();
@@ -304,64 +347,20 @@ public final class StreamTwoInputSelectableProcessor<IN1, IN2> implements Stream
 		}
 	}
 
-	/**
-	 * @return false if both of the inputs are finished, true otherwise.
-	 */
-	private boolean waitForAvailableInput(InputSelection inputSelection)
-		throws ExecutionException, InterruptedException, IOException {
-
-		if (inputSelection.isALLMaskOf2()) {
-			return waitForAvailableEitherInput();
-		} else {
-			waitForOneInput(
-				(inputSelection.getInputMask() == InputSelection.FIRST.getInputMask()) ? input1 : input2);
-			return true;
+	private CompletableFuture<?> isAnyInputAvailable() {
+		if (input1.isFinished()) {
+			return input2.isFinished() ? AVAILABLE : input2.isAvailable();
 		}
-	}
-
-	private boolean waitForAvailableEitherInput()
-		throws ExecutionException, InterruptedException {
 
-		CompletableFuture<?> future1 = input1.isFinished() ? UNAVAILABLE : input1.isAvailable();
-		CompletableFuture<?> future2 = input2.isFinished() ? UNAVAILABLE : input2.isAvailable();
-
-		if (future1 == UNAVAILABLE && future2 == UNAVAILABLE) {
-			return false;
+		if (input2.isFinished()) {
+			return input1.isAvailable();
 		}
 
-		// block to wait for a available input
-		CompletableFuture.anyOf(future1, future2).get();
-
-		if (future1.isDone()) {
-			setAvailableInput(input1.getInputIndex());
-		}
-		if (future2.isDone()) {
-			setAvailableInput(input2.getInputIndex());
-		}
+		CompletableFuture<?> input1Available = input1.isAvailable();
+		CompletableFuture<?> input2Available = input2.isAvailable();
 
-		return true;
-	}
-
-	private void waitForOneInput(StreamTaskInput input)
-		throws IOException, ExecutionException, InterruptedException {
-
-		if (input.isFinished()) {
-			throw new IOException("Could not read the finished input: input" + (input.getInputIndex() + 1) +  ".");
-		}
-
-		input.isAvailable().get();
-		setAvailableInput(input.getInputIndex());
-	}
-
-	private boolean checkFinished() throws Exception {
-		if (getInput(lastReadInputIndex).isFinished()) {
-			synchronized (lock) {
-				operatorChain.endInput(getInputId(lastReadInputIndex));
-				inputSelection = inputSelector.nextSelection();
-			}
-		}
-
-		return input1.isFinished() && input2.isFinished();
+		return (input1Available == AVAILABLE || input2Available == AVAILABLE) ?
+			AVAILABLE : CompletableFuture.anyOf(input1Available, input2Available);
 	}
 
 	private void setAvailableInput(int inputIndex) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 7960a2f..53142a6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -61,6 +61,7 @@ import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext;
 import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
 import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxProcessor;
+import org.apache.flink.streaming.runtime.tasks.mailbox.execution.SuspendedMailboxDefaultAction;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -267,7 +268,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	 */
 	protected void performDefaultAction(DefaultActionContext context) throws Exception {
 		if (!inputProcessor.processInput()) {
-			context.allActionsCompleted();
+			if (inputProcessor.isFinished()) {
+				context.allActionsCompleted();
+			}
+			else {
+				SuspendedMailboxDefaultAction suspendedDefaultAction = context.suspendDefaultAction();
+				inputProcessor.isAvailable().thenRun(suspendedDefaultAction::resume);
+			}
 		}
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java
index c7c1440..16e5bd1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java
@@ -39,7 +39,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -104,13 +103,14 @@ public class StreamTaskSelectiveReadingTest {
 	}
 
 	@Test
-	public void testReadFinishedInput() {
+	public void testReadFinishedInput() throws Exception {
 		try {
 			testBase(new TestReadFinishedInputStreamOperator(), false, new ConcurrentLinkedQueue<>(), true);
 			fail("should throw an IOException");
-		} catch (Throwable t) {
-			assertTrue("wrong exception, should be IOException",
-				ExceptionUtils.findThrowableWithMessage(t, "Could not read the finished input: input1").isPresent());
+		} catch (Exception t) {
+			if (!ExceptionUtils.findThrowableWithMessage(t, "only first input is selected but it is already finished").isPresent()) {
+				throw t;
+			}
 		}
 	}
 


[flink] 02/02: [hotfix][task, test] Do not override performDefaultAction in StreamTaskTest

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 434f59f53c8fe5cd47406472cd0ddf9051081f18
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Wed Jun 26 14:20:48 2019 +0200

    [hotfix][task,test] Do not override performDefaultAction in StreamTaskTest
---
 .../tasks/StreamTaskCancellationBarrierTest.java   |  1 +
 .../streaming/runtime/tasks/StreamTaskTest.java    | 52 ++++++++++++++++------
 2 files changed, 40 insertions(+), 13 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index 21427e0..92cf60b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -185,6 +185,7 @@ public class StreamTaskCancellationBarrierTest {
 
 		@Override
 		protected void init() throws Exception {
+			super.init();
 			synchronized (lock) {
 				while (running) {
 					lock.wait();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 949a75f..26c6faf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -108,6 +108,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
+import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext;
@@ -145,6 +146,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning;
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.everyItem;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasSize;
@@ -826,11 +828,8 @@ public class StreamTaskTest extends TestLogger {
 		}
 
 		@Override
-		protected void init() throws Exception {}
-
-		@Override
-		protected void performDefaultAction(DefaultActionContext context) throws Exception {
-			context.allActionsCompleted();
+		protected void init() throws Exception {
+			inputProcessor = new EmptyInputProcessor();
 		}
 
 		@Override
@@ -1019,7 +1018,6 @@ public class StreamTaskTest extends TestLogger {
 	private static class MockStreamTask extends StreamTask<String, AbstractStreamOperator<String>> {
 
 		private final OperatorChain<String, AbstractStreamOperator<String>> overrideOperatorChain;
-		private volatile boolean inputFinished;
 
 		MockStreamTask(Environment env, OperatorChain<String, AbstractStreamOperator<String>> operatorChain, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
 			super(env, null, uncaughtExceptionHandler);
@@ -1033,20 +1031,47 @@ public class StreamTaskTest extends TestLogger {
 			// here for test purposes.
 			super.operatorChain = this.overrideOperatorChain;
 			super.headOperator = super.operatorChain.getHeadOperator();
+			super.inputProcessor = new EmptyInputProcessor(false);
+		}
+
+		void finishInput() {
+			checkState(inputProcessor != null, "Tried to finishInput before MockStreamTask was started");
+			((EmptyInputProcessor) inputProcessor).finishInput();
+		}
+	}
+
+	private static class EmptyInputProcessor implements StreamInputProcessor {
+		private volatile boolean isFinished;
+
+		public EmptyInputProcessor() {
+			this(true);
+		}
+
+		public EmptyInputProcessor(boolean startFinished) {
+			isFinished = startFinished;
 		}
 
 		@Override
-		protected void performDefaultAction(DefaultActionContext context) {
-			if (isCanceled() || inputFinished) {
-				context.allActionsCompleted();
-			}
+		public boolean processInput() throws Exception {
+			return false;
 		}
 
 		@Override
-		protected void cleanup() throws Exception {}
+		public void close() throws IOException {
+		}
 
-		void finishInput() {
-			this.inputFinished = true;
+		@Override
+		public boolean isFinished() {
+			return isFinished;
+		}
+
+		@Override
+		public CompletableFuture<?> isAvailable() {
+			return AVAILABLE;
+		}
+
+		public void finishInput() {
+			isFinished = true;
 		}
 	}
 
@@ -1262,6 +1287,7 @@ public class StreamTaskTest extends TestLogger {
 
 		@Override
 		protected void init() throws Exception {
+			super.init();
 			getProcessingTimeService().registerTimer(0, new ProcessingTimeCallback() {
 				@Override
 				public void onProcessingTime(long timestamp) throws Exception {