You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/04 11:30:22 UTC

[flink] 04/06: [hotfix][operator] Deduplicate StreamTask code

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6caa61b13e9a6bfeaa27e7be2c97ff099730a5e3
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Jun 21 13:10:04 2019 +0200

    [hotfix][operator] Deduplicate StreamTask code
---
 .../runtime/tasks/OneInputStreamTask.java          |  4 ---
 .../flink/streaming/runtime/tasks/StreamTask.java  |  3 +-
 .../tasks/TwoInputSelectableStreamTask.java        |  5 ----
 .../runtime/tasks/TwoInputStreamTask.java          |  5 ----
 .../tasks/StreamTaskCancellationBarrierTest.java   | 11 ++------
 .../runtime/tasks/StreamTaskTerminationTest.java   |  4 ---
 .../streaming/runtime/tasks/StreamTaskTest.java    | 25 ++---------------
 .../runtime/tasks/SynchronousCheckpointITCase.java |  4 ---
 .../runtime/tasks/SynchronousCheckpointTest.java   | 14 ++--------
 .../tasks/TaskCheckpointingBehaviourTest.java      |  3 --
 .../flink/streaming/util/MockStreamTask.java       |  3 --
 .../jobmaster/JobMasterStopWithSavepointIT.java    | 32 +---------------------
 12 files changed, 11 insertions(+), 102 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index f787571..e0b328c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -110,8 +110,4 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 			inputProcessor.cleanup();
 		}
 	}
-
-	@Override
-	protected void cancelTask() {
-	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index e5f56b2..d333b8d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -234,7 +234,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 	protected abstract void cleanup() throws Exception;
 
-	protected abstract void cancelTask() throws Exception;
+	protected void cancelTask() throws Exception {
+	}
 
 	/**
 	 * This method implements the default action of the task (e.g. processing one event from the input). Implementations
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
index cde5a5a..6c11abf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
@@ -76,9 +76,4 @@ public class TwoInputSelectableStreamTask<IN1, IN2, OUT> extends AbstractTwoInpu
 			inputProcessor.cleanup();
 		}
 	}
-
-	@Override
-	protected void cancelTask() {
-
-	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 4670c0b..569cf16 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -76,9 +76,4 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends AbstractTwoInputStreamTas
 			inputProcessor.cleanup();
 		}
 	}
-
-	@Override
-	protected void cancelTask() {
-
-	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index 456aea5..21427e0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
 import org.apache.flink.streaming.runtime.io.CheckpointBarrierAlignerTestBase;
 import org.apache.flink.streaming.runtime.io.CheckpointBarrierAlignerTestBase.CheckpointExceptionMatcher;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask;
 
 import org.junit.Test;
 
@@ -173,7 +174,7 @@ public class StreamTaskCancellationBarrierTest {
 	//  test tasks / functions
 	// ------------------------------------------------------------------------
 
-	private static class InitBlockingTask extends StreamTask<String, AbstractStreamOperator<String>> {
+	private static class InitBlockingTask extends NoOpStreamTask<String, AbstractStreamOperator<String>> {
 
 		private final Object lock = new Object();
 		private volatile boolean running = true;
@@ -192,14 +193,6 @@ public class StreamTaskCancellationBarrierTest {
 		}
 
 		@Override
-		protected void performDefaultAction(ActionContext context) throws Exception {
-			context.allActionsCompleted();
-		}
-
-		@Override
-		protected void cleanup() throws Exception {}
-
-		@Override
 		protected void cancelTask() throws Exception {
 			running = false;
 			synchronized (lock) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index db3d657..72e8a19 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -241,10 +241,6 @@ public class StreamTaskTerminationTest extends TestLogger {
 			// wait until all async checkpoint threads are terminated, so that no more exceptions can be reported
 			Assert.assertTrue(getAsyncOperationsThreadPool().awaitTermination(30L, TimeUnit.SECONDS));
 		}
-
-		@Override
-		protected void cancelTask() {
-		}
 	}
 
 	private static class NoOpStreamOperator<T> extends AbstractStreamOperator<T> {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 46b5389..4ed23c7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -805,9 +805,6 @@ public class StreamTaskTest extends TestLogger {
 
 		@Override
 		protected void cleanup() throws Exception {}
-
-		@Override
-		protected void cancelTask() throws Exception {}
 	}
 
 	private static class BlockingCloseStreamOperator extends AbstractStreamOperator<Void> {
@@ -1007,9 +1004,6 @@ public class StreamTaskTest extends TestLogger {
 		@Override
 		protected void cleanup() throws Exception {}
 
-		@Override
-		protected void cancelTask() throws Exception {}
-
 		void finishInput() {
 			this.inputFinished = true;
 		}
@@ -1045,9 +1039,6 @@ public class StreamTaskTest extends TestLogger {
 		protected void cleanup() throws Exception {}
 
 		@Override
-		protected void cancelTask() throws Exception {}
-
-		@Override
 		public StreamTaskStateInitializer createStreamTaskStateInitializer() {
 			final StreamTaskStateInitializer streamTaskStateManager = super.createStreamTaskStateInitializer();
 			return (operatorID, operatorClassName, keyContext, keySerializer, closeableRegistry, metricGroup) -> {
@@ -1212,12 +1203,12 @@ public class StreamTaskTest extends TestLogger {
 	/**
 	 * A task that register a processing time service callback.
 	 */
-	public static class TimeServiceTask extends StreamTask<String, AbstractStreamOperator<String>> {
+	public static class TimeServiceTask extends NoOpStreamTask<String, AbstractStreamOperator<String>> {
 
 		private final List<ClassLoader> classLoaders = Collections.synchronizedList(new ArrayList<>());
 
 		public TimeServiceTask(Environment env) {
-			super(env, null);
+			super(env);
 		}
 
 		public List<ClassLoader> getClassLoaders() {
@@ -1238,17 +1229,7 @@ public class StreamTaskTest extends TestLogger {
 		@Override
 		protected void performDefaultAction(ActionContext context) throws Exception {
 			syncLatch.await();
-			context.allActionsCompleted();
-		}
-
-		@Override
-		protected void cleanup() throws Exception {
-
-		}
-
-		@Override
-		protected void cancelTask() throws Exception {
-
+			super.performDefaultAction(context);
 		}
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
index 00860b6..bac9d43 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
@@ -164,10 +164,6 @@ public class SynchronousCheckpointITCase {
 		}
 
 		@Override
-		protected void cancelTask() {
-		}
-
-		@Override
 		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) throws Exception {
 			eventQueue.put(Event.PRE_TRIGGER_CHECKPOINT);
 			boolean result = super.triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
index f06cfd2..8b71423 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -155,7 +156,7 @@ public class SynchronousCheckpointTest {
 		return new StreamTaskUnderTest(environment, runningLatch, execLatch);
 	}
 
-	private static class StreamTaskUnderTest extends StreamTask {
+	private static class StreamTaskUnderTest extends NoOpStreamTask {
 
 		private final OneShotLatch runningLatch;
 		private final OneShotLatch execLatch;
@@ -170,19 +171,10 @@ public class SynchronousCheckpointTest {
 		}
 
 		@Override
-		protected void init() {}
-
-		@Override
 		protected void performDefaultAction(ActionContext context) throws Exception {
 			runningLatch.trigger();
 			execLatch.await();
-			context.allActionsCompleted();
+			super.performDefaultAction(context);
 		}
-
-		@Override
-		protected void cleanup() {}
-
-		@Override
-		protected void cancelTask() {}
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index 589b64c..e40e23d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -490,8 +490,5 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
 
 		@Override
 		protected void cleanup() {}
-
-		@Override
-		protected void cancelTask() {}
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
index 835d924..37e7328 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
@@ -89,9 +89,6 @@ public class MockStreamTask extends StreamTask {
 	protected void cleanup() { }
 
 	@Override
-	protected void cancelTask() { }
-
-	@Override
 	public String getName() {
 		return name;
 	}
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
index 1e383f8..d54ec1f 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask;
 import org.apache.flink.test.util.AbstractTestBase;
 
 import org.junit.Assume;
@@ -354,35 +355,4 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase {
 			finishLatch.trigger();
 		}
 	}
-
-	/**
-	 * A {@link StreamTask} that does nothing.
-	 * This exists only to avoid having to implement all abstract methods in the subclasses above.
-	 */
-	public static class NoOpStreamTask extends StreamTask {
-
-		NoOpStreamTask(final Environment env) {
-			super(env);
-		}
-
-		@Override
-		protected void init() {
-
-		}
-
-		@Override
-		protected void performDefaultAction(ActionContext context) throws Exception {
-			context.allActionsCompleted();
-		}
-
-		@Override
-		protected void cleanup() {
-
-		}
-
-		@Override
-		protected void cancelTask() throws Exception {
-
-		}
-	}
 }