You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/04 11:30:22 UTC
[flink] 04/06: [hotfix][operator] Deduplicate StreamTask code
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6caa61b13e9a6bfeaa27e7be2c97ff099730a5e3
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Jun 21 13:10:04 2019 +0200
[hotfix][operator] Deduplicate StreamTask code
---
.../runtime/tasks/OneInputStreamTask.java | 4 ---
.../flink/streaming/runtime/tasks/StreamTask.java | 3 +-
.../tasks/TwoInputSelectableStreamTask.java | 5 ----
.../runtime/tasks/TwoInputStreamTask.java | 5 ----
.../tasks/StreamTaskCancellationBarrierTest.java | 11 ++------
.../runtime/tasks/StreamTaskTerminationTest.java | 4 ---
.../streaming/runtime/tasks/StreamTaskTest.java | 25 ++---------------
.../runtime/tasks/SynchronousCheckpointITCase.java | 4 ---
.../runtime/tasks/SynchronousCheckpointTest.java | 14 ++--------
.../tasks/TaskCheckpointingBehaviourTest.java | 3 --
.../flink/streaming/util/MockStreamTask.java | 3 --
.../jobmaster/JobMasterStopWithSavepointIT.java | 32 +---------------------
12 files changed, 11 insertions(+), 102 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index f787571..e0b328c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -110,8 +110,4 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
inputProcessor.cleanup();
}
}
-
- @Override
- protected void cancelTask() {
- }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index e5f56b2..d333b8d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -234,7 +234,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
protected abstract void cleanup() throws Exception;
- protected abstract void cancelTask() throws Exception;
+ protected void cancelTask() throws Exception {
+ }
/**
* This method implements the default action of the task (e.g. processing one event from the input). Implementations
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
index cde5a5a..6c11abf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
@@ -76,9 +76,4 @@ public class TwoInputSelectableStreamTask<IN1, IN2, OUT> extends AbstractTwoInpu
inputProcessor.cleanup();
}
}
-
- @Override
- protected void cancelTask() {
-
- }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 4670c0b..569cf16 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -76,9 +76,4 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends AbstractTwoInputStreamTas
inputProcessor.cleanup();
}
}
-
- @Override
- protected void cancelTask() {
-
- }
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index 456aea5..21427e0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierAlignerTestBase;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierAlignerTestBase.CheckpointExceptionMatcher;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask;
import org.junit.Test;
@@ -173,7 +174,7 @@ public class StreamTaskCancellationBarrierTest {
// test tasks / functions
// ------------------------------------------------------------------------
- private static class InitBlockingTask extends StreamTask<String, AbstractStreamOperator<String>> {
+ private static class InitBlockingTask extends NoOpStreamTask<String, AbstractStreamOperator<String>> {
private final Object lock = new Object();
private volatile boolean running = true;
@@ -192,14 +193,6 @@ public class StreamTaskCancellationBarrierTest {
}
@Override
- protected void performDefaultAction(ActionContext context) throws Exception {
- context.allActionsCompleted();
- }
-
- @Override
- protected void cleanup() throws Exception {}
-
- @Override
protected void cancelTask() throws Exception {
running = false;
synchronized (lock) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index db3d657..72e8a19 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -241,10 +241,6 @@ public class StreamTaskTerminationTest extends TestLogger {
// wait until all async checkpoint threads are terminated, so that no more exceptions can be reported
Assert.assertTrue(getAsyncOperationsThreadPool().awaitTermination(30L, TimeUnit.SECONDS));
}
-
- @Override
- protected void cancelTask() {
- }
}
private static class NoOpStreamOperator<T> extends AbstractStreamOperator<T> {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 46b5389..4ed23c7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -805,9 +805,6 @@ public class StreamTaskTest extends TestLogger {
@Override
protected void cleanup() throws Exception {}
-
- @Override
- protected void cancelTask() throws Exception {}
}
private static class BlockingCloseStreamOperator extends AbstractStreamOperator<Void> {
@@ -1007,9 +1004,6 @@ public class StreamTaskTest extends TestLogger {
@Override
protected void cleanup() throws Exception {}
- @Override
- protected void cancelTask() throws Exception {}
-
void finishInput() {
this.inputFinished = true;
}
@@ -1045,9 +1039,6 @@ public class StreamTaskTest extends TestLogger {
protected void cleanup() throws Exception {}
@Override
- protected void cancelTask() throws Exception {}
-
- @Override
public StreamTaskStateInitializer createStreamTaskStateInitializer() {
final StreamTaskStateInitializer streamTaskStateManager = super.createStreamTaskStateInitializer();
return (operatorID, operatorClassName, keyContext, keySerializer, closeableRegistry, metricGroup) -> {
@@ -1212,12 +1203,12 @@ public class StreamTaskTest extends TestLogger {
/**
* A task that register a processing time service callback.
*/
- public static class TimeServiceTask extends StreamTask<String, AbstractStreamOperator<String>> {
+ public static class TimeServiceTask extends NoOpStreamTask<String, AbstractStreamOperator<String>> {
private final List<ClassLoader> classLoaders = Collections.synchronizedList(new ArrayList<>());
public TimeServiceTask(Environment env) {
- super(env, null);
+ super(env);
}
public List<ClassLoader> getClassLoaders() {
@@ -1238,17 +1229,7 @@ public class StreamTaskTest extends TestLogger {
@Override
protected void performDefaultAction(ActionContext context) throws Exception {
syncLatch.await();
- context.allActionsCompleted();
- }
-
- @Override
- protected void cleanup() throws Exception {
-
- }
-
- @Override
- protected void cancelTask() throws Exception {
-
+ super.performDefaultAction(context);
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
index 00860b6..bac9d43 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
@@ -164,10 +164,6 @@ public class SynchronousCheckpointITCase {
}
@Override
- protected void cancelTask() {
- }
-
- @Override
public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) throws Exception {
eventQueue.put(Event.PRE_TRIGGER_CHECKPOINT);
boolean result = super.triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
index f06cfd2..8b71423 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask;
import org.junit.Before;
import org.junit.Test;
@@ -155,7 +156,7 @@ public class SynchronousCheckpointTest {
return new StreamTaskUnderTest(environment, runningLatch, execLatch);
}
- private static class StreamTaskUnderTest extends StreamTask {
+ private static class StreamTaskUnderTest extends NoOpStreamTask {
private final OneShotLatch runningLatch;
private final OneShotLatch execLatch;
@@ -170,19 +171,10 @@ public class SynchronousCheckpointTest {
}
@Override
- protected void init() {}
-
- @Override
protected void performDefaultAction(ActionContext context) throws Exception {
runningLatch.trigger();
execLatch.await();
- context.allActionsCompleted();
+ super.performDefaultAction(context);
}
-
- @Override
- protected void cleanup() {}
-
- @Override
- protected void cancelTask() {}
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index 589b64c..e40e23d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -490,8 +490,5 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
@Override
protected void cleanup() {}
-
- @Override
- protected void cancelTask() {}
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
index 835d924..37e7328 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
@@ -89,9 +89,6 @@ public class MockStreamTask extends StreamTask {
protected void cleanup() { }
@Override
- protected void cancelTask() { }
-
- @Override
public String getName() {
return name;
}
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
index 1e383f8..d54ec1f 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask;
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.Assume;
@@ -354,35 +355,4 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase {
finishLatch.trigger();
}
}
-
- /**
- * A {@link StreamTask} that does nothing.
- * This exists only to avoid having to implement all abstract methods in the subclasses above.
- */
- public static class NoOpStreamTask extends StreamTask {
-
- NoOpStreamTask(final Environment env) {
- super(env);
- }
-
- @Override
- protected void init() {
-
- }
-
- @Override
- protected void performDefaultAction(ActionContext context) throws Exception {
- context.allActionsCompleted();
- }
-
- @Override
- protected void cleanup() {
-
- }
-
- @Override
- protected void cancelTask() throws Exception {
-
- }
- }
}