You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/08/08 08:31:34 UTC
[flink] 02/02: [hotfix][task,
test] Do not override performDefaultAction in StreamTaskTest
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 434f59f53c8fe5cd47406472cd0ddf9051081f18
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Wed Jun 26 14:20:48 2019 +0200
[hotfix][task,test] Do not override performDefaultAction in StreamTaskTest
---
.../tasks/StreamTaskCancellationBarrierTest.java | 1 +
.../streaming/runtime/tasks/StreamTaskTest.java | 52 ++++++++++++++++------
2 files changed, 40 insertions(+), 13 deletions(-)
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index 21427e0..92cf60b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -185,6 +185,7 @@ public class StreamTaskCancellationBarrierTest {
@Override
protected void init() throws Exception {
+ super.init();
synchronized (lock) {
while (running) {
lock.wait();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 949a75f..26c6faf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -108,6 +108,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
+import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext;
@@ -145,6 +146,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning;
+import static org.apache.flink.util.Preconditions.checkState;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
@@ -826,11 +828,8 @@ public class StreamTaskTest extends TestLogger {
}
@Override
- protected void init() throws Exception {}
-
- @Override
- protected void performDefaultAction(DefaultActionContext context) throws Exception {
- context.allActionsCompleted();
+ protected void init() throws Exception {
+ inputProcessor = new EmptyInputProcessor();
}
@Override
@@ -1019,7 +1018,6 @@ public class StreamTaskTest extends TestLogger {
private static class MockStreamTask extends StreamTask<String, AbstractStreamOperator<String>> {
private final OperatorChain<String, AbstractStreamOperator<String>> overrideOperatorChain;
- private volatile boolean inputFinished;
MockStreamTask(Environment env, OperatorChain<String, AbstractStreamOperator<String>> operatorChain, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
super(env, null, uncaughtExceptionHandler);
@@ -1033,20 +1031,47 @@ public class StreamTaskTest extends TestLogger {
// here for test purposes.
super.operatorChain = this.overrideOperatorChain;
super.headOperator = super.operatorChain.getHeadOperator();
+ super.inputProcessor = new EmptyInputProcessor(false);
+ }
+
+ void finishInput() {
+ checkState(inputProcessor != null, "Tried to finishInput before MockStreamTask was started");
+ ((EmptyInputProcessor) inputProcessor).finishInput();
+ }
+ }
+
+ private static class EmptyInputProcessor implements StreamInputProcessor {
+ private volatile boolean isFinished;
+
+ public EmptyInputProcessor() {
+ this(true);
+ }
+
+ public EmptyInputProcessor(boolean startFinished) {
+ isFinished = startFinished;
}
@Override
- protected void performDefaultAction(DefaultActionContext context) {
- if (isCanceled() || inputFinished) {
- context.allActionsCompleted();
- }
+ public boolean processInput() throws Exception {
+ return false;
}
@Override
- protected void cleanup() throws Exception {}
+ public void close() throws IOException {
+ }
- void finishInput() {
- this.inputFinished = true;
+ @Override
+ public boolean isFinished() {
+ return isFinished;
+ }
+
+ @Override
+ public CompletableFuture<?> isAvailable() {
+ return AVAILABLE;
+ }
+
+ public void finishInput() {
+ isFinished = true;
}
}
@@ -1262,6 +1287,7 @@ public class StreamTaskTest extends TestLogger {
@Override
protected void init() throws Exception {
+ super.init();
getProcessingTimeService().registerTimer(0, new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {