You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/27 18:30:06 UTC

[flink] 01/04: [hotfix][tests] Pull TestTaskBuilder out of TaskTest

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4ccd335175a1ffa18a6e4cb9f1067f37ec7535ff
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Jul 26 19:02:34 2019 +0200

    [hotfix][tests] Pull TestTaskBuilder out of TaskTest
---
 .../apache/flink/runtime/taskmanager/TaskTest.java | 234 +-------------------
 .../flink/runtime/taskmanager/TestTaskBuilder.java | 240 +++++++++++++++++++++
 2 files changed, 249 insertions(+), 225 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 5879f52..167c988 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -19,19 +19,14 @@
 package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
-import org.apache.flink.runtime.blob.TransientBlobCache;
 import org.apache.flink.runtime.blob.VoidBlobStore;
-import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -41,42 +36,22 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.JobInformation;
-import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.filecache.FileCache;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
-import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
-import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteChannelStateChecker;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
-import org.apache.flink.runtime.state.TestTaskStateManager;
-import org.apache.flink.runtime.taskexecutor.KvStateService;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
-import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
-import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.WrappingRuntimeException;
 
@@ -89,7 +64,6 @@ import org.junit.rules.TemporaryFolder;
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.Collections;
@@ -97,7 +71,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -107,10 +80,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -148,6 +120,7 @@ public class TaskTest extends TestLogger {
 	public void testRegularExecution() throws Exception {
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
 		final Task task = createTaskBuilder()
+			.setInvokable(TestInvokableCorrect.class)
 			.setTaskManagerActions(taskManagerActions)
 			.build();
 
@@ -307,7 +280,7 @@ public class TaskTest extends TestLogger {
 		final PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
 
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder(shuffleEnvironment)
+		final Task task = new TestTaskBuilder(shuffleEnvironment)
 			.setTaskManagerActions(taskManagerActions)
 			.setConsumableNotifier(consumableNotifier)
 			.setPartitionProducerStateChecker(partitionProducerStateChecker)
@@ -608,7 +581,7 @@ public class TaskTest extends TestLogger {
 
 		int producingStateCounter = 0;
 		for (ExecutionState state : ExecutionState.values()) {
-			setState(task, initialTaskState);
+			TestTaskBuilder.setTaskState(task, initialTaskState);
 
 			if (checker.isProducerReadyOrAbortConsumption(task.new PartitionProducerStateResponseHandle(state, null))) {
 				producingStateCounter++;
@@ -651,7 +624,7 @@ public class TaskTest extends TestLogger {
 				.setPartitionProducerStateChecker(partitionChecker)
 				.setExecutor(Executors.directExecutor())
 				.build();
-			setState(task, ExecutionState.RUNNING);
+			TestTaskBuilder.setTaskState(task, ExecutionState.RUNNING);
 
 			final CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
 			when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
@@ -675,7 +648,7 @@ public class TaskTest extends TestLogger {
 				.setPartitionProducerStateChecker(partitionChecker)
 				.setExecutor(Executors.directExecutor())
 				.build();
-			setState(task, ExecutionState.RUNNING);
+			TestTaskBuilder.setTaskState(task, ExecutionState.RUNNING);
 
 			final CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
 			when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
@@ -1013,197 +986,8 @@ public class TaskTest extends TestLogger {
 	//  helper functions
 	// ------------------------------------------------------------------------
 
-	private void setInputGate(Task task, SingleInputGate inputGate) {
-		try {
-			Field f = Task.class.getDeclaredField("inputGates");
-			f.setAccessible(true);
-			f.set(task, new SingleInputGate[]{inputGate});
-
-			Map<IntermediateDataSetID, SingleInputGate> byId = new HashMap<>(1);
-			byId.put(inputGate.getConsumedResultId(), inputGate);
-
-			f = Task.class.getDeclaredField("inputGatesById");
-			f.setAccessible(true);
-			f.set(task, byId);
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Modifying the task state failed", e);
-		}
-	}
-
-	private void setState(Task task, ExecutionState state) {
-		try {
-			Field f = Task.class.getDeclaredField("executionState");
-			f.setAccessible(true);
-			f.set(task, state);
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Modifying the task state failed", e);
-		}
-	}
-
-	private TaskBuilder createTaskBuilder() {
-		return new TaskBuilder(shuffleEnvironment);
-	}
-
-	private static final class TaskBuilder {
-		private Class<? extends AbstractInvokable> invokable;
-		private TaskManagerActions taskManagerActions;
-		private LibraryCacheManager libraryCacheManager;
-		private ResultPartitionConsumableNotifier consumableNotifier;
-		private PartitionProducerStateChecker partitionProducerStateChecker;
-		private final ShuffleEnvironment<?, ?> shuffleEnvironment;
-		private KvStateService kvStateService;
-		private Executor executor;
-		private Configuration taskManagerConfig;
-		private ExecutionConfig executionConfig;
-		private Collection<PermanentBlobKey> requiredJarFileBlobKeys;
-		private Collection<ResultPartitionDeploymentDescriptor> resultPartitions = Collections.emptyList();
-		private Collection<InputGateDeploymentDescriptor> inputGates = Collections.emptyList();
-
-		{
-			invokable = TestInvokableCorrect.class;
-			taskManagerActions = mock(TaskManagerActions.class);
-
-			libraryCacheManager = mock(LibraryCacheManager.class);
-			when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
-
-			consumableNotifier = new NoOpResultPartitionConsumableNotifier();
-			partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
-
-			kvStateService = new KvStateService(new KvStateRegistry(), null, null);
-
-			executor = TestingUtils.defaultExecutor();
-
-			taskManagerConfig = new Configuration();
-			executionConfig = new ExecutionConfig();
-
-			requiredJarFileBlobKeys = Collections.emptyList();
-		}
-
-		private TaskBuilder(ShuffleEnvironment<?, ?> shuffleEnvironment) {
-			this.shuffleEnvironment = Preconditions.checkNotNull(shuffleEnvironment);
-		}
-
-		TaskBuilder setInvokable(Class<? extends AbstractInvokable> invokable) {
-			this.invokable = invokable;
-			return this;
-		}
-
-		TaskBuilder setTaskManagerActions(TaskManagerActions taskManagerActions) {
-			this.taskManagerActions = taskManagerActions;
-			return this;
-		}
-
-		TaskBuilder setLibraryCacheManager(LibraryCacheManager libraryCacheManager) {
-			this.libraryCacheManager = libraryCacheManager;
-			return this;
-		}
-
-		TaskBuilder setConsumableNotifier(ResultPartitionConsumableNotifier consumableNotifier) {
-			this.consumableNotifier = consumableNotifier;
-			return this;
-		}
-
-		TaskBuilder setPartitionProducerStateChecker(PartitionProducerStateChecker partitionProducerStateChecker) {
-			this.partitionProducerStateChecker = partitionProducerStateChecker;
-			return this;
-		}
-
-		TaskBuilder setKvStateService(KvStateService kvStateService) {
-			this.kvStateService = kvStateService;
-			return this;
-		}
-
-		TaskBuilder setExecutor(Executor executor) {
-			this.executor = executor;
-			return this;
-		}
-
-		TaskBuilder setTaskManagerConfig(Configuration taskManagerConfig) {
-			this.taskManagerConfig = taskManagerConfig;
-			return this;
-		}
-
-		TaskBuilder setExecutionConfig(ExecutionConfig executionConfig) {
-			this.executionConfig = executionConfig;
-			return this;
-		}
-
-		TaskBuilder setRequiredJarFileBlobKeys(Collection<PermanentBlobKey> requiredJarFileBlobKeys) {
-			this.requiredJarFileBlobKeys = requiredJarFileBlobKeys;
-			return this;
-		}
-
-		public TaskBuilder setResultPartitions(Collection<ResultPartitionDeploymentDescriptor> resultPartitions) {
-			this.resultPartitions = resultPartitions;
-			return this;
-		}
-
-		public TaskBuilder setInputGates(Collection<InputGateDeploymentDescriptor> inputGates) {
-			this.inputGates = inputGates;
-			return this;
-		}
-
-		private Task build() throws Exception {
-			final JobID jobId = new JobID();
-			final JobVertexID jobVertexId = new JobVertexID();
-			final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
-
-			final SerializedValue<ExecutionConfig> serializedExecutionConfig = new SerializedValue<>(executionConfig);
-
-			final JobInformation jobInformation = new JobInformation(
-				jobId,
-				"Test Job",
-				serializedExecutionConfig,
-				new Configuration(),
-				requiredJarFileBlobKeys,
-				Collections.emptyList());
-
-			final TaskInformation taskInformation = new TaskInformation(
-				jobVertexId,
-				"Test Task",
-				1,
-				1,
-				invokable.getName(),
-				new Configuration());
-
-			final BlobCacheService blobCacheService = new BlobCacheService(
-				mock(PermanentBlobCache.class),
-				mock(TransientBlobCache.class));
-
-			final TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
-
-			return new Task(
-				jobInformation,
-				taskInformation,
-				executionAttemptId,
-				new AllocationID(),
-				0,
-				0,
-				resultPartitions,
-				inputGates,
-				0,
-				mock(MemoryManager.class),
-				mock(IOManager.class),
-				shuffleEnvironment,
-				kvStateService,
-				mock(BroadcastVariableManager.class),
-				new TaskEventDispatcher(),
-				new TestTaskStateManager(),
-				taskManagerActions,
-				new MockInputSplitProvider(),
-				new TestCheckpointResponder(),
-				new TestGlobalAggregateManager(),
-				blobCacheService,
-				libraryCacheManager,
-				mock(FileCache.class),
-				new TestingTaskManagerRuntimeInfo(taskManagerConfig),
-				taskMetricGroup,
-				consumableNotifier,
-				partitionProducerStateChecker,
-				executor);
-		}
+	private TestTaskBuilder createTaskBuilder() {
+		return new TestTaskBuilder(shuffleEnvironment);
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
new file mode 100644
index 0000000..97cff7b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobCacheService;
+import org.apache.flink.runtime.blob.PermanentBlobCache;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobCache;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.taskexecutor.KvStateService;
+import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
+import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.Executor;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Util that helps building {@link Task} objects for testing.
+ */
+public final class TestTaskBuilder {
+	private Class<? extends AbstractInvokable> invokable;
+	private TaskManagerActions taskManagerActions;
+	private LibraryCacheManager libraryCacheManager;
+	private ResultPartitionConsumableNotifier consumableNotifier;
+	private PartitionProducerStateChecker partitionProducerStateChecker;
+	private final ShuffleEnvironment<?, ?> shuffleEnvironment;
+	private KvStateService kvStateService;
+	private Executor executor;
+	private Configuration taskManagerConfig;
+	private ExecutionConfig executionConfig;
+	private Collection<PermanentBlobKey> requiredJarFileBlobKeys;
+	private Collection<ResultPartitionDeploymentDescriptor> resultPartitions = Collections.emptyList();
+	private Collection<InputGateDeploymentDescriptor> inputGates = Collections.emptyList();
+
+	{
+		invokable = AbstractInvokable.class;
+		taskManagerActions = mock(TaskManagerActions.class);
+
+		libraryCacheManager = mock(LibraryCacheManager.class);
+		when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
+
+		consumableNotifier = new NoOpResultPartitionConsumableNotifier();
+		partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
+
+		kvStateService = new KvStateService(new KvStateRegistry(), null, null);
+
+		executor = TestingUtils.defaultExecutor();
+
+		taskManagerConfig = new Configuration();
+		executionConfig = new ExecutionConfig();
+
+		requiredJarFileBlobKeys = Collections.emptyList();
+	}
+
+	public TestTaskBuilder(ShuffleEnvironment<?, ?> shuffleEnvironment) {
+		this.shuffleEnvironment = Preconditions.checkNotNull(shuffleEnvironment);
+	}
+
+	public TestTaskBuilder setInvokable(Class<? extends AbstractInvokable> invokable) {
+		this.invokable = invokable;
+		return this;
+	}
+
+	public TestTaskBuilder setTaskManagerActions(TaskManagerActions taskManagerActions) {
+		this.taskManagerActions = taskManagerActions;
+		return this;
+	}
+
+	public TestTaskBuilder setLibraryCacheManager(LibraryCacheManager libraryCacheManager) {
+		this.libraryCacheManager = libraryCacheManager;
+		return this;
+	}
+
+	public TestTaskBuilder setConsumableNotifier(ResultPartitionConsumableNotifier consumableNotifier) {
+		this.consumableNotifier = consumableNotifier;
+		return this;
+	}
+
+	public TestTaskBuilder setPartitionProducerStateChecker(PartitionProducerStateChecker partitionProducerStateChecker) {
+		this.partitionProducerStateChecker = partitionProducerStateChecker;
+		return this;
+	}
+
+	public TestTaskBuilder setKvStateService(KvStateService kvStateService) {
+		this.kvStateService = kvStateService;
+		return this;
+	}
+
+	public TestTaskBuilder setExecutor(Executor executor) {
+		this.executor = executor;
+		return this;
+	}
+
+	public TestTaskBuilder setTaskManagerConfig(Configuration taskManagerConfig) {
+		this.taskManagerConfig = taskManagerConfig;
+		return this;
+	}
+
+	public TestTaskBuilder setExecutionConfig(ExecutionConfig executionConfig) {
+		this.executionConfig = executionConfig;
+		return this;
+	}
+
+	public TestTaskBuilder setRequiredJarFileBlobKeys(Collection<PermanentBlobKey> requiredJarFileBlobKeys) {
+		this.requiredJarFileBlobKeys = requiredJarFileBlobKeys;
+		return this;
+	}
+
+	public TestTaskBuilder setResultPartitions(Collection<ResultPartitionDeploymentDescriptor> resultPartitions) {
+		this.resultPartitions = resultPartitions;
+		return this;
+	}
+
+	public TestTaskBuilder setInputGates(Collection<InputGateDeploymentDescriptor> inputGates) {
+		this.inputGates = inputGates;
+		return this;
+	}
+
+	public Task build() throws Exception {
+		final JobID jobId = new JobID();
+		final JobVertexID jobVertexId = new JobVertexID();
+		final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+
+		final SerializedValue<ExecutionConfig> serializedExecutionConfig = new SerializedValue<>(executionConfig);
+
+		final JobInformation jobInformation = new JobInformation(
+			jobId,
+			"Test Job",
+			serializedExecutionConfig,
+			new Configuration(),
+			requiredJarFileBlobKeys,
+			Collections.emptyList());
+
+		final TaskInformation taskInformation = new TaskInformation(
+			jobVertexId,
+			"Test Task",
+			1,
+			1,
+			invokable.getName(),
+			new Configuration());
+
+		final BlobCacheService blobCacheService = new BlobCacheService(
+			mock(PermanentBlobCache.class),
+			mock(TransientBlobCache.class));
+
+		final TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+		return new Task(
+			jobInformation,
+			taskInformation,
+			executionAttemptId,
+			new AllocationID(),
+			0,
+			0,
+			resultPartitions,
+			inputGates,
+			0,
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			shuffleEnvironment,
+			kvStateService,
+			mock(BroadcastVariableManager.class),
+			new TaskEventDispatcher(),
+			new TestTaskStateManager(),
+			taskManagerActions,
+			new MockInputSplitProvider(),
+			new TestCheckpointResponder(),
+			new TestGlobalAggregateManager(),
+			blobCacheService,
+			libraryCacheManager,
+			mock(FileCache.class),
+			new TestingTaskManagerRuntimeInfo(taskManagerConfig),
+			taskMetricGroup,
+			consumableNotifier,
+			partitionProducerStateChecker,
+			executor);
+	}
+
+	public static void setTaskState(Task task, ExecutionState state) {
+		try {
+			Field f = Task.class.getDeclaredField("executionState");
+			f.setAccessible(true);
+			f.set(task, state);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Modifying the task state failed", e);
+		}
+	}
+}