You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/03/21 14:11:26 UTC

[flink] branch master updated (75ad29c -> 7dafd1c)

This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 75ad29c  [FLINK-16472] support precision of timestamp and time data types
     new 8a1666e  [hotfix][task] Remove unnecessary fields and methods from StreamTask
     new 6937fd8  [hotfix][tests] Remove dead codes from SynchronousCheckpointTest
     new 7dafd1c  [FLINK-16690][tests] Refactor StreamTaskTest to reuse TestTaskBuilder and MockStreamTaskBuilder

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/runtime/taskmanager/TestTaskBuilder.java |   8 +-
 .../api/operators/AbstractStreamOperator.java      |   2 +-
 .../flink/streaming/runtime/tasks/StreamTask.java  |  14 --
 .../AbstractUdfStreamOperatorLifecycleTest.java    |  38 +--
 .../tasks/StreamTaskCancellationBarrierTest.java   |  35 ---
 .../streaming/runtime/tasks/StreamTaskTest.java    | 263 +++++++--------------
 .../flink/streaming/util/MockStreamTask.java       |  23 +-
 .../streaming/util/MockStreamTaskBuilder.java      |  22 +-
 8 files changed, 135 insertions(+), 270 deletions(-)


[flink] 03/03: [FLINK-16690][tests] Refactor StreamTaskTest to reuse TestTaskBuilder and MockStreamTaskBuilder

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7dafd1cd65ba8977d923482a810c907ed7d37b59
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Fri Mar 20 13:14:35 2020 +0800

    [FLINK-16690][tests] Refactor StreamTaskTest to reuse TestTaskBuilder and MockStreamTaskBuilder
    
    This closes #11459
---
 .../flink/runtime/taskmanager/TestTaskBuilder.java |   8 +-
 .../AbstractUdfStreamOperatorLifecycleTest.java    |  38 +--
 .../streaming/runtime/tasks/StreamTaskTest.java    | 263 +++++++--------------
 .../flink/streaming/util/MockStreamTask.java       |  13 +-
 .../streaming/util/MockStreamTaskBuilder.java      |  17 +-
 5 files changed, 134 insertions(+), 205 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
index 2c5f341..c05e075 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
@@ -80,6 +80,7 @@ public final class TestTaskBuilder {
 	private KvStateService kvStateService = new KvStateService(new KvStateRegistry(), null, null);
 	private Executor executor = TestingUtils.defaultExecutor();
 	private Configuration taskManagerConfig = new Configuration();
+	private Configuration taskConfig = new Configuration();
 	private ExecutionConfig executionConfig = new ExecutionConfig();
 	private Collection<PermanentBlobKey> requiredJarFileBlobKeys = Collections.emptyList();
 	private List<ResultPartitionDeploymentDescriptor> resultPartitions = Collections.emptyList();
@@ -132,6 +133,11 @@ public final class TestTaskBuilder {
 		return this;
 	}
 
+	public TestTaskBuilder setTaskConfig(Configuration taskConfig) {
+		this.taskConfig = taskConfig;
+		return this;
+	}
+
 	public TestTaskBuilder setExecutionConfig(ExecutionConfig executionConfig) {
 		this.executionConfig = executionConfig;
 		return this;
@@ -186,7 +192,7 @@ public final class TestTaskBuilder {
 			1,
 			1,
 			invokable.getName(),
-			new Configuration());
+			taskConfig);
 
 		final BlobCacheService blobCacheService = new BlobCacheService(
 			mock(PermanentBlobCache.class),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index cb24990..1c2240e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -25,7 +25,9 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.taskmanager.Task;
@@ -141,16 +143,18 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 		cfg.setOperatorID(new OperatorID());
 		cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
-		Task task = StreamTaskTest.createTask(SourceStreamTask.class, cfg, taskManagerConfig);
+		try (ShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build()) {
+			Task task = StreamTaskTest.createTask(SourceStreamTask.class, shuffleEnvironment, cfg, taskManagerConfig);
 
-		task.startTaskThread();
+			task.startTaskThread();
 
-		LifecycleTrackingStreamSource.runStarted.await();
+			LifecycleTrackingStreamSource.runStarted.await();
 
-		// wait for clean termination
-		task.getExecutingThread().join();
-		assertEquals(ExecutionState.FINISHED, task.getExecutionState());
-		assertEquals(EXPECTED_CALL_ORDER_FULL, ACTUAL_ORDER_TRACKING);
+			// wait for clean termination
+			task.getExecutingThread().join();
+			assertEquals(ExecutionState.FINISHED, task.getExecutionState());
+			assertEquals(EXPECTED_CALL_ORDER_FULL, ACTUAL_ORDER_TRACKING);
+		}
 	}
 
 	@Test
@@ -164,18 +168,20 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 		cfg.setOperatorID(new OperatorID());
 		cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
-		Task task = StreamTaskTest.createTask(SourceStreamTask.class, cfg, taskManagerConfig);
+		try (ShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build()) {
+			Task task = StreamTaskTest.createTask(SourceStreamTask.class, shuffleEnvironment, cfg, taskManagerConfig);
 
-		task.startTaskThread();
-		LifecycleTrackingStreamSource.runStarted.await();
+			task.startTaskThread();
+			LifecycleTrackingStreamSource.runStarted.await();
 
-		// this should cancel the task even though it is blocked on runFinished
-		task.cancelExecution();
+			// this should cancel the task even though it is blocked on runFinished
+			task.cancelExecution();
 
-		// wait for clean termination
-		task.getExecutingThread().join();
-		assertEquals(ExecutionState.CANCELED, task.getExecutionState());
-		assertEquals(EXPECTED_CALL_ORDER_CANCEL_RUNNING, ACTUAL_ORDER_TRACKING);
+			// wait for clean termination
+			task.getExecutingThread().join();
+			assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+			assertEquals(EXPECTED_CALL_ORDER_CANCEL_RUNNING, ACTUAL_ORDER_TRACKING);
+		}
 	}
 
 	private static class MockSourceFunction extends RichSourceFunction<Long> {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index dc9d4ea..1628e4c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
@@ -26,10 +25,6 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.blob.BlobCacheService;
-import org.apache.flink.runtime.blob.PermanentBlobCache;
-import org.apache.flink.runtime.blob.TransientBlobCache;
-import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -37,38 +32,23 @@ import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.TestingUncaughtExceptionHandler;
-import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.JobInformation;
-import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.filecache.FileCache;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
-import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.writer.AvailabilityTestResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -89,19 +69,14 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TaskStateManagerImpl;
-import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.runtime.taskexecutor.KvStateService;
-import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
-import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
-import org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
+import org.apache.flink.runtime.taskmanager.TestTaskBuilder;
 import org.apache.flink.runtime.util.FatalExitExceptionHandler;
-import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -124,11 +99,11 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
 import org.apache.flink.streaming.util.MockStreamConfig;
+import org.apache.flink.streaming.util.MockStreamTaskBuilder;
 import org.apache.flink.streaming.util.TestSequentialReadingStreamOperator;
 import org.apache.flink.util.CloseableIterable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.RunnableWithException;
 import org.apache.flink.util.function.SupplierWithException;
@@ -156,7 +131,6 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RunnableFuture;
@@ -234,23 +208,29 @@ public class StreamTaskTest extends TestLogger {
 		cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
 		final TaskManagerActions taskManagerActions = spy(new NoOpTaskManagerActions());
-		final Task task = createTask(SourceStreamTask.class, cfg, new Configuration(), taskManagerActions);
+		try (NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build()) {
+			final Task task =  new TestTaskBuilder(shuffleEnvironment)
+				.setInvokable(SourceStreamTask.class)
+				.setTaskConfig(cfg.getConfiguration())
+				.setTaskManagerActions(taskManagerActions)
+				.build();
 
-		final TaskExecutionState state = new TaskExecutionState(
-			task.getJobID(), task.getExecutionId(), ExecutionState.RUNNING);
+			final TaskExecutionState state = new TaskExecutionState(
+				task.getJobID(), task.getExecutionId(), ExecutionState.RUNNING);
 
-		task.startTaskThread();
+			task.startTaskThread();
 
-		verify(taskManagerActions, timeout(2000L)).updateTaskExecutionState(eq(state));
+			verify(taskManagerActions, timeout(2000L)).updateTaskExecutionState(eq(state));
 
-		// send a cancel. because the operator takes a long time to deserialize, this should
-		// hit the task before the operator is deserialized
-		task.cancelExecution();
+			// send a cancel. because the operator takes a long time to deserialize, this should
+			// hit the task before the operator is deserialized
+			task.cancelExecution();
 
-		task.getExecutingThread().join();
+			task.getExecutingThread().join();
 
-		assertFalse("Task did not cancel", task.getExecutingThread().isAlive());
-		assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+			assertFalse("Task did not cancel", task.getExecutingThread().isAlive());
+			assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+		}
 	}
 
 	@Test
@@ -265,24 +245,26 @@ public class StreamTaskTest extends TestLogger {
 		cfg.setStreamOperator(streamSource);
 		cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
-		Task task = createTask(StateBackendTestSource.class, cfg, taskManagerConfig);
+		try (ShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build()) {
+			Task task = createTask(StateBackendTestSource.class, shuffleEnvironment, cfg, taskManagerConfig);
 
-		StateBackendTestSource.fail = false;
-		task.startTaskThread();
+			StateBackendTestSource.fail = false;
+			task.startTaskThread();
 
-		// wait for clean termination
-		task.getExecutingThread().join();
+			// wait for clean termination
+			task.getExecutingThread().join();
 
-		// ensure that the state backends and stream iterables are closed ...
-		verify(TestStreamSource.operatorStateBackend).close();
-		verify(TestStreamSource.keyedStateBackend).close();
-		verify(TestStreamSource.rawOperatorStateInputs).close();
-		verify(TestStreamSource.rawKeyedStateInputs).close();
-		// ... and disposed
-		verify(TestStreamSource.operatorStateBackend).dispose();
-		verify(TestStreamSource.keyedStateBackend).dispose();
+			// ensure that the state backends and stream iterables are closed ...
+			verify(TestStreamSource.operatorStateBackend).close();
+			verify(TestStreamSource.keyedStateBackend).close();
+			verify(TestStreamSource.rawOperatorStateInputs).close();
+			verify(TestStreamSource.rawKeyedStateInputs).close();
+			// ... and disposed
+			verify(TestStreamSource.operatorStateBackend).dispose();
+			verify(TestStreamSource.keyedStateBackend).dispose();
 
-		assertEquals(ExecutionState.FINISHED, task.getExecutionState());
+			assertEquals(ExecutionState.FINISHED, task.getExecutionState());
+		}
 	}
 
 	@Test
@@ -297,24 +279,26 @@ public class StreamTaskTest extends TestLogger {
 		cfg.setStreamOperator(streamSource);
 		cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
-		Task task = createTask(StateBackendTestSource.class, cfg, taskManagerConfig);
+		try (NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build()) {
+			Task task = createTask(StateBackendTestSource.class, shuffleEnvironment, cfg, taskManagerConfig);
 
-		StateBackendTestSource.fail = true;
-		task.startTaskThread();
+			StateBackendTestSource.fail = true;
+			task.startTaskThread();
 
-		// wait for clean termination
-		task.getExecutingThread().join();
+			// wait for clean termination
+			task.getExecutingThread().join();
 
-		// ensure that the state backends and stream iterables are closed ...
-		verify(TestStreamSource.operatorStateBackend).close();
-		verify(TestStreamSource.keyedStateBackend).close();
-		verify(TestStreamSource.rawOperatorStateInputs).close();
-		verify(TestStreamSource.rawKeyedStateInputs).close();
-		// ... and disposed
-		verify(TestStreamSource.operatorStateBackend).dispose();
-		verify(TestStreamSource.keyedStateBackend).dispose();
+			// ensure that the state backends and stream iterables are closed ...
+			verify(TestStreamSource.operatorStateBackend).close();
+			verify(TestStreamSource.keyedStateBackend).close();
+			verify(TestStreamSource.rawOperatorStateInputs).close();
+			verify(TestStreamSource.rawKeyedStateInputs).close();
+			// ... and disposed
+			verify(TestStreamSource.operatorStateBackend).dispose();
+			verify(TestStreamSource.keyedStateBackend).dispose();
 
-		assertEquals(ExecutionState.FAILED, task.getExecutionState());
+			assertEquals(ExecutionState.FAILED, task.getExecutionState());
+		}
 	}
 
 	@Test
@@ -322,19 +306,22 @@ public class StreamTaskTest extends TestLogger {
 		syncLatch = new OneShotLatch();
 
 		StreamConfig cfg = new StreamConfig(new Configuration());
-		Task task = createTask(CancelFailingTask.class, cfg, new Configuration());
+		try (NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build()) {
+
+			Task task = createTask(CancelFailingTask.class, shuffleEnvironment, cfg, new Configuration());
 
-		// start the task and wait until it runs
-		// execution state RUNNING is not enough, we need to wait until the stream task's run() method
-		// is entered
-		task.startTaskThread();
-		syncLatch.await();
+			// start the task and wait until it runs
+			// execution state RUNNING is not enough, we need to wait until the stream task's run() method
+			// is entered
+			task.startTaskThread();
+			syncLatch.await();
 
-		// cancel the execution - this should lead to smooth shutdown
-		task.cancelExecution();
-		task.getExecutingThread().join();
+			// cancel the execution - this should lead to smooth shutdown
+			task.cancelExecution();
+			task.getExecutingThread().join();
 
-		assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+			assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+		}
 	}
 
 	/**
@@ -439,12 +426,15 @@ public class StreamTaskTest extends TestLogger {
 	@Test
 	public void testCancelTaskExceptionHandling() throws Exception {
 		StreamConfig cfg = new StreamConfig(new Configuration());
-		Task task = createTask(CancelThrowingTask.class, cfg, new Configuration());
 
-		task.startTaskThread();
-		task.getExecutingThread().join();
+		try (NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build()) {
+			Task task = createTask(CancelThrowingTask.class, shuffleEnvironment, cfg, new Configuration());
+
+			task.startTaskThread();
+			task.getExecutingThread().join();
 
-		assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+			assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+		}
 	}
 
 	/**
@@ -983,7 +973,9 @@ public class StreamTaskTest extends TestLogger {
 		try (final MockEnvironment environment = setupEnvironment(new boolean[] {true, true})) {
 			final int numberOfProcessCalls = 10;
 			final AvailabilityTestInputProcessor inputProcessor = new AvailabilityTestInputProcessor(numberOfProcessCalls);
-			final AvailabilityTestStreamTask task = new AvailabilityTestStreamTask<>(environment, inputProcessor);
+			final StreamTask task = new MockStreamTaskBuilder(environment)
+				.setStreamInputProcessor(inputProcessor)
+				.build();
 
 			task.invoke();
 			assertEquals(numberOfProcessCalls, inputProcessor.currentNumProcessCalls);
@@ -995,7 +987,9 @@ public class StreamTaskTest extends TestLogger {
 		try (final MockEnvironment environment = setupEnvironment(new boolean[] {true, false})) {
 			final int numberOfProcessCalls = 10;
 			final AvailabilityTestInputProcessor inputProcessor = new AvailabilityTestInputProcessor(numberOfProcessCalls);
-			final AvailabilityTestStreamTask task = new AvailabilityTestStreamTask<>(environment, inputProcessor);
+			final StreamTask task = new MockStreamTaskBuilder(environment)
+				.setStreamInputProcessor(inputProcessor)
+				.build();
 			final MailboxExecutor executor = task.mailboxProcessor.getMainMailboxExecutor();
 
 			final RunnableWithException completeFutureTask = () -> {
@@ -1129,22 +1123,6 @@ public class StreamTaskTest extends TestLogger {
 	}
 
 	/**
-	 * A stream task implementation used to construct a specific {@link StreamInputProcessor} for tests.
-	 */
-	private static class AvailabilityTestStreamTask<T, OP extends StreamOperator<T>> extends StreamTask<T, OP> {
-
-		AvailabilityTestStreamTask(Environment environment, StreamInputProcessor inputProcessor) {
-			super(environment);
-
-			this.inputProcessor = inputProcessor;
-		}
-
-		@Override
-		protected void init() {
-		}
-	}
-
-	/**
 	 * A stream input processor implementation used to control the returned input status based on
 	 * the total number of processing calls.
 	 */
@@ -1197,85 +1175,16 @@ public class StreamTaskTest extends TestLogger {
 	}
 
 	public static Task createTask(
-		Class<? extends AbstractInvokable> invokable,
-		StreamConfig taskConfig,
-		Configuration taskManagerConfig) throws Exception {
-		return createTask(invokable, taskConfig, taskManagerConfig, new TestTaskStateManager(), mock(TaskManagerActions.class));
-	}
-
-	public static Task createTask(
-		Class<? extends AbstractInvokable> invokable,
-		StreamConfig taskConfig,
-		Configuration taskManagerConfig,
-		TaskManagerActions taskManagerActions) throws Exception {
-		return createTask(invokable, taskConfig, taskManagerConfig, new TestTaskStateManager(), taskManagerActions);
-	}
-
-	public static Task createTask(
 			Class<? extends AbstractInvokable> invokable,
+			ShuffleEnvironment shuffleEnvironment,
 			StreamConfig taskConfig,
-			Configuration taskManagerConfig,
-			TestTaskStateManager taskStateManager,
-			TaskManagerActions taskManagerActions) throws Exception {
-
-		BlobCacheService blobService =
-			new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class));
-
-		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
-		when(libCache.getClassLoader(any(JobID.class))).thenReturn(StreamTaskTest.class.getClassLoader());
-
-		ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
-		PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
-		Executor executor = mock(Executor.class);
-
-		ShuffleEnvironment<?, ?> shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
-
-		JobInformation jobInformation = new JobInformation(
-			new JobID(),
-			"Job Name",
-			new SerializedValue<>(new ExecutionConfig()),
-			new Configuration(),
-			Collections.emptyList(),
-			Collections.emptyList());
-
-		TaskInformation taskInformation = new TaskInformation(
-			new JobVertexID(),
-			"Test Task",
-			1,
-			1,
-			invokable.getName(),
-			taskConfig.getConfiguration());
-
-		return new Task(
-			jobInformation,
-			taskInformation,
-			new ExecutionAttemptID(),
-			new AllocationID(),
-			0,
-			0,
-			Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-			Collections.<InputGateDeploymentDescriptor>emptyList(),
-			0,
-			mock(MemoryManager.class),
-			mock(IOManager.class),
-			shuffleEnvironment,
-			new KvStateService(new KvStateRegistry(), null, null),
-			mock(BroadcastVariableManager.class),
-			new TaskEventDispatcher(),
-			taskStateManager,
-			taskManagerActions,
-			mock(InputSplitProvider.class),
-			mock(CheckpointResponder.class),
-			new NoOpTaskOperatorEventGateway(),
-			new TestGlobalAggregateManager(),
-			blobService,
-			libCache,
-			mock(FileCache.class),
-			new TestingTaskManagerRuntimeInfo(taskManagerConfig, new String[] {System.getProperty("java.io.tmpdir")}),
-			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
-			consumableNotifier,
-			partitionProducerStateChecker,
-			executor);
+			Configuration taskManagerConfig) throws Exception {
+
+		return new TestTaskBuilder(shuffleEnvironment)
+			.setTaskManagerConfig(taskManagerConfig)
+			.setInvokable(invokable)
+			.setTaskConfig(taskConfig.getConfiguration())
+			.build();
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
index d190924..77e2904 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
@@ -26,13 +26,13 @@ import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
+import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceFactory;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
 import org.apache.flink.streaming.runtime.tasks.TimerService;
-import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
 import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
 
 import java.util.function.BiConsumer;
@@ -66,8 +66,9 @@ public class MockStreamTask<OUT, OP extends StreamOperator<OUT>> extends StreamT
 		TimerService timerService,
 		BiConsumer<String, Throwable> handleAsyncException,
 		TaskMailbox taskMailbox,
-		StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor taskActionExecutor
-	) {
+		StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor taskActionExecutor,
+		StreamInputProcessor inputProcessor) {
+
 		super(environment, timerService, FatalExitExceptionHandler.INSTANCE, taskActionExecutor, taskMailbox);
 		this.name = name;
 		this.checkpointLock = checkpointLock;
@@ -79,6 +80,7 @@ public class MockStreamTask<OUT, OP extends StreamOperator<OUT>> extends StreamT
 		this.checkpointStorage = checkpointStorage;
 		this.processingTimeService = timerService;
 		this.handleAsyncException = handleAsyncException;
+		this.inputProcessor = inputProcessor;
 	}
 
 	@Override
@@ -86,11 +88,6 @@ public class MockStreamTask<OUT, OP extends StreamOperator<OUT>> extends StreamT
 	}
 
 	@Override
-	protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
-		controller.allActionsCompleted();
-	}
-
-	@Override
 	protected void cleanup() {
 		mailboxProcessor.allActionsCompleted();
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java
index 680fac0..946809e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.CheckpointStorage;
@@ -31,6 +30,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.MockStreamStatusMaintainer;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
+import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -38,6 +38,8 @@ import org.apache.flink.streaming.runtime.tasks.TimerService;
 import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
 import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
 
+import javax.annotation.Nullable;
+
 import java.util.function.BiConsumer;
 
 /**
@@ -47,7 +49,7 @@ public class MockStreamTaskBuilder {
 	private final Environment environment;
 	private String name = "Mock Task";
 	private Object checkpointLock = new Object();
-	private StreamConfig config = new StreamConfig(new Configuration());
+	private StreamConfig config;
 	private ExecutionConfig executionConfig = new ExecutionConfig();
 	private CloseableRegistry closableRegistry = new CloseableRegistry();
 	private StreamStatusMaintainer streamStatusMaintainer = new MockStreamStatusMaintainer();
@@ -57,9 +59,12 @@ public class MockStreamTaskBuilder {
 	private BiConsumer<String, Throwable> handleAsyncException = (message, throwable) -> { };
 	private TaskMailbox taskMailbox = new TaskMailboxImpl();
 	private StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor taskActionExecutor = StreamTaskActionExecutor.synchronizedExecutor();
+	@Nullable
+	private StreamInputProcessor inputProcessor;
 
 	public MockStreamTaskBuilder(Environment environment) throws Exception {
 		this.environment = environment;
+		this.config = new StreamConfig(environment.getTaskConfiguration());
 
 		StateBackend stateBackend = new MemoryStateBackend();
 		this.checkpointStorage = stateBackend.createCheckpointStorage(new JobID());
@@ -126,6 +131,11 @@ public class MockStreamTaskBuilder {
 		return this;
 	}
 
+	public MockStreamTaskBuilder setStreamInputProcessor(StreamInputProcessor inputProcessor) {
+		this.inputProcessor = inputProcessor;
+		return this;
+	}
+
 	public MockStreamTask build() {
 		return new MockStreamTask(
 			environment,
@@ -140,6 +150,7 @@ public class MockStreamTaskBuilder {
 			timerService,
 			handleAsyncException,
 			taskMailbox,
-			taskActionExecutor);
+			taskActionExecutor,
+			inputProcessor);
 	}
 }


[flink] 01/03: [hotfix][task] Remove unnecessary fields and methods from StreamTask

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8a1666e8678ca79bf57ab1279ea2dfc9f6f8e8a7
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Thu Mar 19 17:21:50 2020 +0800

    [hotfix][task] Remove unnecessary fields and methods from StreamTask
    
    This closes #11459
---
 .../streaming/api/operators/AbstractStreamOperator.java    |  2 +-
 .../apache/flink/streaming/runtime/tasks/StreamTask.java   | 14 --------------
 .../org/apache/flink/streaming/util/MockStreamTask.java    | 10 ----------
 .../apache/flink/streaming/util/MockStreamTaskBuilder.java |  5 -----
 4 files changed, 1 insertion(+), 30 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index f876d3c..532c57f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -227,7 +227,7 @@ public abstract class AbstractStreamOperator<OUT>
 				LatencyStats.Granularity.SINGLE);
 		}
 
-		this.runtimeContext = new StreamingRuntimeContext(this, environment, container.getAccumulatorMap());
+		this.runtimeContext = new StreamingRuntimeContext(this, environment, environment.getAccumulatorRegistry().getUserMap());
 
 		stateKeySelector1 = config.getStatePartitioner(0, getUserCodeClassloader());
 		stateKeySelector2 = config.getStatePartitioner(1, getUserCodeClassloader());
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index aabd9c5..82651ac 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.Counter;
@@ -44,7 +43,6 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
-import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -184,9 +182,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 	private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
 
-	/** The map of user-defined accumulators of this task. */
-	private final Map<String, Accumulator<?, ?>> accumulatorMap;
-
 	/** The currently active background materialization threads. */
 	private final CloseableRegistry cancelables = new CloseableRegistry();
 
@@ -272,7 +267,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		this.timerService = timerService;
 		this.uncaughtExceptionHandler = Preconditions.checkNotNull(uncaughtExceptionHandler);
 		this.configuration = new StreamConfig(getTaskConfiguration());
-		this.accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap();
 		this.recordWriter = createRecordWriterDelegate(configuration, environment);
 		this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
 		this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
@@ -687,18 +681,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		return configuration;
 	}
 
-	public Map<String, Accumulator<?, ?>> getAccumulatorMap() {
-		return accumulatorMap;
-	}
-
 	public StreamStatusMaintainer getStreamStatusMaintainer() {
 		return operatorChain;
 	}
 
-	public OperatorEventDispatcher getOperatorEventDispatcher() {
-		return operatorChain.getOperatorEventDispatcher();
-	}
-
 	RecordWriterOutput<?>[] getStreamOutputs() {
 		return operatorChain.getStreamOutputs();
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
index ffd4a50..d190924 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
@@ -36,7 +35,6 @@ import org.apache.flink.streaming.runtime.tasks.TimerService;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
 import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
 
-import java.util.Map;
 import java.util.function.BiConsumer;
 
 /**
@@ -54,7 +52,6 @@ public class MockStreamTask<OUT, OP extends StreamOperator<OUT>> extends StreamT
 	private final CheckpointStorageWorkerView checkpointStorage;
 	private final ProcessingTimeService processingTimeService;
 	private final BiConsumer<String, Throwable> handleAsyncException;
-	private final Map<String, Accumulator<?, ?>> accumulatorMap;
 
 	public MockStreamTask(
 		Environment environment,
@@ -68,7 +65,6 @@ public class MockStreamTask<OUT, OP extends StreamOperator<OUT>> extends StreamT
 		CheckpointStorageWorkerView checkpointStorage,
 		TimerService timerService,
 		BiConsumer<String, Throwable> handleAsyncException,
-		Map<String, Accumulator<?, ?>> accumulatorMap,
 		TaskMailbox taskMailbox,
 		StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor taskActionExecutor
 	) {
@@ -83,7 +79,6 @@ public class MockStreamTask<OUT, OP extends StreamOperator<OUT>> extends StreamT
 		this.checkpointStorage = checkpointStorage;
 		this.processingTimeService = timerService;
 		this.handleAsyncException = handleAsyncException;
-		this.accumulatorMap = accumulatorMap;
 	}
 
 	@Override
@@ -159,11 +154,6 @@ public class MockStreamTask<OUT, OP extends StreamOperator<OUT>> extends StreamT
 	}
 
 	@Override
-	public Map<String, Accumulator<?, ?>> getAccumulatorMap() {
-		return accumulatorMap;
-	}
-
-	@Override
 	public ProcessingTimeServiceFactory getProcessingTimeServiceFactory() {
 		return mailboxExecutor -> processingTimeService;
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java
index 904c5e9..680fac0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.execution.Environment;
@@ -39,8 +38,6 @@ import org.apache.flink.streaming.runtime.tasks.TimerService;
 import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
 import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
 
-import java.util.Collections;
-import java.util.Map;
 import java.util.function.BiConsumer;
 
 /**
@@ -58,7 +55,6 @@ public class MockStreamTaskBuilder {
 	private TimerService timerService = new TestProcessingTimeService();
 	private StreamTaskStateInitializer streamTaskStateInitializer;
 	private BiConsumer<String, Throwable> handleAsyncException = (message, throwable) -> { };
-	private Map<String, Accumulator<?, ?>> accumulatorMap = Collections.emptyMap();
 	private TaskMailbox taskMailbox = new TaskMailboxImpl();
 	private StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor taskActionExecutor = StreamTaskActionExecutor.synchronizedExecutor();
 
@@ -143,7 +139,6 @@ public class MockStreamTaskBuilder {
 			checkpointStorage,
 			timerService,
 			handleAsyncException,
-			accumulatorMap,
 			taskMailbox,
 			taskActionExecutor);
 	}


[flink] 02/03: [hotfix][tests] Remove dead codes from SynchronousCheckpointTest

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6937fd82e9811847321a7d341de486a428afd1bd
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Thu Mar 19 16:15:56 2020 +0800

    [hotfix][tests] Remove dead codes from SynchronousCheckpointTest
    
    This closes #11459
---
 .../tasks/StreamTaskCancellationBarrierTest.java   | 35 ----------------------
 1 file changed, 35 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index 6d55866..52433d7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -21,17 +21,14 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
-import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
 import org.apache.flink.streaming.runtime.io.CheckpointBarrierAlignerTestBase;
 import org.apache.flink.streaming.runtime.io.CheckpointBarrierAlignerTestBase.CheckpointExceptionMatcher;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -145,38 +142,6 @@ public class StreamTaskCancellationBarrierTest {
 		testHarness.waitForTaskCompletion();
 	}
 
-	// ------------------------------------------------------------------------
-	//  test tasks / functions
-	// ------------------------------------------------------------------------
-
-	private static class InitBlockingTask extends NoOpStreamTask<String, AbstractStreamOperator<String>> {
-
-		private final Object lock = new Object();
-		private volatile boolean running = true;
-
-		protected InitBlockingTask(Environment env) {
-			super(env);
-		}
-
-		@Override
-		protected void init() throws Exception {
-			super.init();
-			synchronized (lock) {
-				while (running) {
-					lock.wait();
-				}
-			}
-		}
-
-		@Override
-		protected void cancelTask() throws Exception {
-			running = false;
-			synchronized (lock) {
-				lock.notifyAll();
-			}
-		}
-	}
-
 	private static class IdentityMap implements MapFunction<String, String> {
 		private static final long serialVersionUID = 1L;