You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/08/08 08:31:34 UTC

[flink] 02/02: [hotfix][task, test] Do not override performDefaultAction in StreamTaskTest

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 434f59f53c8fe5cd47406472cd0ddf9051081f18
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Wed Jun 26 14:20:48 2019 +0200

    [hotfix][task,test] Do not override performDefaultAction in StreamTaskTest
---
 .../tasks/StreamTaskCancellationBarrierTest.java   |  1 +
 .../streaming/runtime/tasks/StreamTaskTest.java    | 52 ++++++++++++++++------
 2 files changed, 40 insertions(+), 13 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index 21427e0..92cf60b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -185,6 +185,7 @@ public class StreamTaskCancellationBarrierTest {
 
 		@Override
 		protected void init() throws Exception {
+			super.init();
 			synchronized (lock) {
 				while (running) {
 					lock.wait();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 949a75f..26c6faf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -108,6 +108,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
+import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext;
@@ -145,6 +146,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning;
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.everyItem;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasSize;
@@ -826,11 +828,8 @@ public class StreamTaskTest extends TestLogger {
 		}
 
 		@Override
-		protected void init() throws Exception {}
-
-		@Override
-		protected void performDefaultAction(DefaultActionContext context) throws Exception {
-			context.allActionsCompleted();
+		protected void init() throws Exception {
+			inputProcessor = new EmptyInputProcessor();
 		}
 
 		@Override
@@ -1019,7 +1018,6 @@ public class StreamTaskTest extends TestLogger {
 	private static class MockStreamTask extends StreamTask<String, AbstractStreamOperator<String>> {
 
 		private final OperatorChain<String, AbstractStreamOperator<String>> overrideOperatorChain;
-		private volatile boolean inputFinished;
 
 		MockStreamTask(Environment env, OperatorChain<String, AbstractStreamOperator<String>> operatorChain, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
 			super(env, null, uncaughtExceptionHandler);
@@ -1033,20 +1031,47 @@ public class StreamTaskTest extends TestLogger {
 			// here for test purposes.
 			super.operatorChain = this.overrideOperatorChain;
 			super.headOperator = super.operatorChain.getHeadOperator();
+			super.inputProcessor = new EmptyInputProcessor(false);
+		}
+
+		void finishInput() {
+			checkState(inputProcessor != null, "Tried to finishInput before MockStreamTask was started");
+			((EmptyInputProcessor) inputProcessor).finishInput();
+		}
+	}
+
+	private static class EmptyInputProcessor implements StreamInputProcessor {
+		private volatile boolean isFinished;
+
+		public EmptyInputProcessor() {
+			this(true);
+		}
+
+		public EmptyInputProcessor(boolean startFinished) {
+			isFinished = startFinished;
 		}
 
 		@Override
-		protected void performDefaultAction(DefaultActionContext context) {
-			if (isCanceled() || inputFinished) {
-				context.allActionsCompleted();
-			}
+		public boolean processInput() throws Exception {
+			return false;
 		}
 
 		@Override
-		protected void cleanup() throws Exception {}
+		public void close() throws IOException {
+		}
 
-		void finishInput() {
-			this.inputFinished = true;
+		@Override
+		public boolean isFinished() {
+			return isFinished;
+		}
+
+		@Override
+		public CompletableFuture<?> isAvailable() {
+			return AVAILABLE;
+		}
+
+		public void finishInput() {
+			isFinished = true;
 		}
 	}
 
@@ -1262,6 +1287,7 @@ public class StreamTaskTest extends TestLogger {
 
 		@Override
 		protected void init() throws Exception {
+			super.init();
 			getProcessingTimeService().registerTimer(0, new ProcessingTimeCallback() {
 				@Override
 				public void onProcessingTime(long timestamp) throws Exception {