You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/27 18:30:06 UTC
[flink] 01/04: [hotfix][tests] Pull TestTaskBuilder out of TaskTest
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4ccd335175a1ffa18a6e4cb9f1067f37ec7535ff
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Jul 26 19:02:34 2019 +0200
[hotfix][tests] Pull TestTaskBuilder out of TaskTest
---
.../apache/flink/runtime/taskmanager/TaskTest.java | 234 +-------------------
.../flink/runtime/taskmanager/TestTaskBuilder.java | 240 +++++++++++++++++++++
2 files changed, 249 insertions(+), 225 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 5879f52..167c988 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -19,19 +19,14 @@
package org.apache.flink.runtime.taskmanager;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.PermanentBlobKey;
-import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.blob.VoidBlobStore;
-import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -41,42 +36,22 @@ import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.JobInformation;
-import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.filecache.FileCache;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
-import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
-import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteChannelStateChecker;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
-import org.apache.flink.runtime.state.TestTaskStateManager;
-import org.apache.flink.runtime.taskexecutor.KvStateService;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
-import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
-import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.WrappingRuntimeException;
@@ -89,7 +64,6 @@ import org.junit.rules.TemporaryFolder;
import javax.annotation.Nonnull;
import java.io.IOException;
-import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
@@ -97,7 +71,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -107,10 +80,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -148,6 +120,7 @@ public class TaskTest extends TestLogger {
public void testRegularExecution() throws Exception {
final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
final Task task = createTaskBuilder()
+ .setInvokable(TestInvokableCorrect.class)
.setTaskManagerActions(taskManagerActions)
.build();
@@ -307,7 +280,7 @@ public class TaskTest extends TestLogger {
final PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
- final Task task = new TaskBuilder(shuffleEnvironment)
+ final Task task = new TestTaskBuilder(shuffleEnvironment)
.setTaskManagerActions(taskManagerActions)
.setConsumableNotifier(consumableNotifier)
.setPartitionProducerStateChecker(partitionProducerStateChecker)
@@ -608,7 +581,7 @@ public class TaskTest extends TestLogger {
int producingStateCounter = 0;
for (ExecutionState state : ExecutionState.values()) {
- setState(task, initialTaskState);
+ TestTaskBuilder.setTaskState(task, initialTaskState);
if (checker.isProducerReadyOrAbortConsumption(task.new PartitionProducerStateResponseHandle(state, null))) {
producingStateCounter++;
@@ -651,7 +624,7 @@ public class TaskTest extends TestLogger {
.setPartitionProducerStateChecker(partitionChecker)
.setExecutor(Executors.directExecutor())
.build();
- setState(task, ExecutionState.RUNNING);
+ TestTaskBuilder.setTaskState(task, ExecutionState.RUNNING);
final CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
@@ -675,7 +648,7 @@ public class TaskTest extends TestLogger {
.setPartitionProducerStateChecker(partitionChecker)
.setExecutor(Executors.directExecutor())
.build();
- setState(task, ExecutionState.RUNNING);
+ TestTaskBuilder.setTaskState(task, ExecutionState.RUNNING);
final CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
@@ -1013,197 +986,8 @@ public class TaskTest extends TestLogger {
// helper functions
// ------------------------------------------------------------------------
- private void setInputGate(Task task, SingleInputGate inputGate) {
- try {
- Field f = Task.class.getDeclaredField("inputGates");
- f.setAccessible(true);
- f.set(task, new SingleInputGate[]{inputGate});
-
- Map<IntermediateDataSetID, SingleInputGate> byId = new HashMap<>(1);
- byId.put(inputGate.getConsumedResultId(), inputGate);
-
- f = Task.class.getDeclaredField("inputGatesById");
- f.setAccessible(true);
- f.set(task, byId);
- }
- catch (Exception e) {
- throw new RuntimeException("Modifying the task state failed", e);
- }
- }
-
- private void setState(Task task, ExecutionState state) {
- try {
- Field f = Task.class.getDeclaredField("executionState");
- f.setAccessible(true);
- f.set(task, state);
- }
- catch (Exception e) {
- throw new RuntimeException("Modifying the task state failed", e);
- }
- }
-
- private TaskBuilder createTaskBuilder() {
- return new TaskBuilder(shuffleEnvironment);
- }
-
- private static final class TaskBuilder {
- private Class<? extends AbstractInvokable> invokable;
- private TaskManagerActions taskManagerActions;
- private LibraryCacheManager libraryCacheManager;
- private ResultPartitionConsumableNotifier consumableNotifier;
- private PartitionProducerStateChecker partitionProducerStateChecker;
- private final ShuffleEnvironment<?, ?> shuffleEnvironment;
- private KvStateService kvStateService;
- private Executor executor;
- private Configuration taskManagerConfig;
- private ExecutionConfig executionConfig;
- private Collection<PermanentBlobKey> requiredJarFileBlobKeys;
- private Collection<ResultPartitionDeploymentDescriptor> resultPartitions = Collections.emptyList();
- private Collection<InputGateDeploymentDescriptor> inputGates = Collections.emptyList();
-
- {
- invokable = TestInvokableCorrect.class;
- taskManagerActions = mock(TaskManagerActions.class);
-
- libraryCacheManager = mock(LibraryCacheManager.class);
- when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
-
- consumableNotifier = new NoOpResultPartitionConsumableNotifier();
- partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
-
- kvStateService = new KvStateService(new KvStateRegistry(), null, null);
-
- executor = TestingUtils.defaultExecutor();
-
- taskManagerConfig = new Configuration();
- executionConfig = new ExecutionConfig();
-
- requiredJarFileBlobKeys = Collections.emptyList();
- }
-
- private TaskBuilder(ShuffleEnvironment<?, ?> shuffleEnvironment) {
- this.shuffleEnvironment = Preconditions.checkNotNull(shuffleEnvironment);
- }
-
- TaskBuilder setInvokable(Class<? extends AbstractInvokable> invokable) {
- this.invokable = invokable;
- return this;
- }
-
- TaskBuilder setTaskManagerActions(TaskManagerActions taskManagerActions) {
- this.taskManagerActions = taskManagerActions;
- return this;
- }
-
- TaskBuilder setLibraryCacheManager(LibraryCacheManager libraryCacheManager) {
- this.libraryCacheManager = libraryCacheManager;
- return this;
- }
-
- TaskBuilder setConsumableNotifier(ResultPartitionConsumableNotifier consumableNotifier) {
- this.consumableNotifier = consumableNotifier;
- return this;
- }
-
- TaskBuilder setPartitionProducerStateChecker(PartitionProducerStateChecker partitionProducerStateChecker) {
- this.partitionProducerStateChecker = partitionProducerStateChecker;
- return this;
- }
-
- TaskBuilder setKvStateService(KvStateService kvStateService) {
- this.kvStateService = kvStateService;
- return this;
- }
-
- TaskBuilder setExecutor(Executor executor) {
- this.executor = executor;
- return this;
- }
-
- TaskBuilder setTaskManagerConfig(Configuration taskManagerConfig) {
- this.taskManagerConfig = taskManagerConfig;
- return this;
- }
-
- TaskBuilder setExecutionConfig(ExecutionConfig executionConfig) {
- this.executionConfig = executionConfig;
- return this;
- }
-
- TaskBuilder setRequiredJarFileBlobKeys(Collection<PermanentBlobKey> requiredJarFileBlobKeys) {
- this.requiredJarFileBlobKeys = requiredJarFileBlobKeys;
- return this;
- }
-
- public TaskBuilder setResultPartitions(Collection<ResultPartitionDeploymentDescriptor> resultPartitions) {
- this.resultPartitions = resultPartitions;
- return this;
- }
-
- public TaskBuilder setInputGates(Collection<InputGateDeploymentDescriptor> inputGates) {
- this.inputGates = inputGates;
- return this;
- }
-
- private Task build() throws Exception {
- final JobID jobId = new JobID();
- final JobVertexID jobVertexId = new JobVertexID();
- final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
-
- final SerializedValue<ExecutionConfig> serializedExecutionConfig = new SerializedValue<>(executionConfig);
-
- final JobInformation jobInformation = new JobInformation(
- jobId,
- "Test Job",
- serializedExecutionConfig,
- new Configuration(),
- requiredJarFileBlobKeys,
- Collections.emptyList());
-
- final TaskInformation taskInformation = new TaskInformation(
- jobVertexId,
- "Test Task",
- 1,
- 1,
- invokable.getName(),
- new Configuration());
-
- final BlobCacheService blobCacheService = new BlobCacheService(
- mock(PermanentBlobCache.class),
- mock(TransientBlobCache.class));
-
- final TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
-
- return new Task(
- jobInformation,
- taskInformation,
- executionAttemptId,
- new AllocationID(),
- 0,
- 0,
- resultPartitions,
- inputGates,
- 0,
- mock(MemoryManager.class),
- mock(IOManager.class),
- shuffleEnvironment,
- kvStateService,
- mock(BroadcastVariableManager.class),
- new TaskEventDispatcher(),
- new TestTaskStateManager(),
- taskManagerActions,
- new MockInputSplitProvider(),
- new TestCheckpointResponder(),
- new TestGlobalAggregateManager(),
- blobCacheService,
- libraryCacheManager,
- mock(FileCache.class),
- new TestingTaskManagerRuntimeInfo(taskManagerConfig),
- taskMetricGroup,
- consumableNotifier,
- partitionProducerStateChecker,
- executor);
- }
+ private TestTaskBuilder createTaskBuilder() {
+ return new TestTaskBuilder(shuffleEnvironment);
}
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
new file mode 100644
index 0000000..97cff7b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobCacheService;
+import org.apache.flink.runtime.blob.PermanentBlobCache;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobCache;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.taskexecutor.KvStateService;
+import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
+import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.Executor;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Util that helps building {@link Task} objects for testing.
+ */
+public final class TestTaskBuilder {
+ private Class<? extends AbstractInvokable> invokable;
+ private TaskManagerActions taskManagerActions;
+ private LibraryCacheManager libraryCacheManager;
+ private ResultPartitionConsumableNotifier consumableNotifier;
+ private PartitionProducerStateChecker partitionProducerStateChecker;
+ private final ShuffleEnvironment<?, ?> shuffleEnvironment;
+ private KvStateService kvStateService;
+ private Executor executor;
+ private Configuration taskManagerConfig;
+ private ExecutionConfig executionConfig;
+ private Collection<PermanentBlobKey> requiredJarFileBlobKeys;
+ private Collection<ResultPartitionDeploymentDescriptor> resultPartitions = Collections.emptyList();
+ private Collection<InputGateDeploymentDescriptor> inputGates = Collections.emptyList();
+
+ {
+ invokable = AbstractInvokable.class;
+ taskManagerActions = mock(TaskManagerActions.class);
+
+ libraryCacheManager = mock(LibraryCacheManager.class);
+ when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
+
+ consumableNotifier = new NoOpResultPartitionConsumableNotifier();
+ partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
+
+ kvStateService = new KvStateService(new KvStateRegistry(), null, null);
+
+ executor = TestingUtils.defaultExecutor();
+
+ taskManagerConfig = new Configuration();
+ executionConfig = new ExecutionConfig();
+
+ requiredJarFileBlobKeys = Collections.emptyList();
+ }
+
+ public TestTaskBuilder(ShuffleEnvironment<?, ?> shuffleEnvironment) {
+ this.shuffleEnvironment = Preconditions.checkNotNull(shuffleEnvironment);
+ }
+
+ public TestTaskBuilder setInvokable(Class<? extends AbstractInvokable> invokable) {
+ this.invokable = invokable;
+ return this;
+ }
+
+ public TestTaskBuilder setTaskManagerActions(TaskManagerActions taskManagerActions) {
+ this.taskManagerActions = taskManagerActions;
+ return this;
+ }
+
+ public TestTaskBuilder setLibraryCacheManager(LibraryCacheManager libraryCacheManager) {
+ this.libraryCacheManager = libraryCacheManager;
+ return this;
+ }
+
+ public TestTaskBuilder setConsumableNotifier(ResultPartitionConsumableNotifier consumableNotifier) {
+ this.consumableNotifier = consumableNotifier;
+ return this;
+ }
+
+ public TestTaskBuilder setPartitionProducerStateChecker(PartitionProducerStateChecker partitionProducerStateChecker) {
+ this.partitionProducerStateChecker = partitionProducerStateChecker;
+ return this;
+ }
+
+ public TestTaskBuilder setKvStateService(KvStateService kvStateService) {
+ this.kvStateService = kvStateService;
+ return this;
+ }
+
+ public TestTaskBuilder setExecutor(Executor executor) {
+ this.executor = executor;
+ return this;
+ }
+
+ public TestTaskBuilder setTaskManagerConfig(Configuration taskManagerConfig) {
+ this.taskManagerConfig = taskManagerConfig;
+ return this;
+ }
+
+ public TestTaskBuilder setExecutionConfig(ExecutionConfig executionConfig) {
+ this.executionConfig = executionConfig;
+ return this;
+ }
+
+ public TestTaskBuilder setRequiredJarFileBlobKeys(Collection<PermanentBlobKey> requiredJarFileBlobKeys) {
+ this.requiredJarFileBlobKeys = requiredJarFileBlobKeys;
+ return this;
+ }
+
+ public TestTaskBuilder setResultPartitions(Collection<ResultPartitionDeploymentDescriptor> resultPartitions) {
+ this.resultPartitions = resultPartitions;
+ return this;
+ }
+
+ public TestTaskBuilder setInputGates(Collection<InputGateDeploymentDescriptor> inputGates) {
+ this.inputGates = inputGates;
+ return this;
+ }
+
+ public Task build() throws Exception {
+ final JobID jobId = new JobID();
+ final JobVertexID jobVertexId = new JobVertexID();
+ final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+
+ final SerializedValue<ExecutionConfig> serializedExecutionConfig = new SerializedValue<>(executionConfig);
+
+ final JobInformation jobInformation = new JobInformation(
+ jobId,
+ "Test Job",
+ serializedExecutionConfig,
+ new Configuration(),
+ requiredJarFileBlobKeys,
+ Collections.emptyList());
+
+ final TaskInformation taskInformation = new TaskInformation(
+ jobVertexId,
+ "Test Task",
+ 1,
+ 1,
+ invokable.getName(),
+ new Configuration());
+
+ final BlobCacheService blobCacheService = new BlobCacheService(
+ mock(PermanentBlobCache.class),
+ mock(TransientBlobCache.class));
+
+ final TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+ return new Task(
+ jobInformation,
+ taskInformation,
+ executionAttemptId,
+ new AllocationID(),
+ 0,
+ 0,
+ resultPartitions,
+ inputGates,
+ 0,
+ mock(MemoryManager.class),
+ mock(IOManager.class),
+ shuffleEnvironment,
+ kvStateService,
+ mock(BroadcastVariableManager.class),
+ new TaskEventDispatcher(),
+ new TestTaskStateManager(),
+ taskManagerActions,
+ new MockInputSplitProvider(),
+ new TestCheckpointResponder(),
+ new TestGlobalAggregateManager(),
+ blobCacheService,
+ libraryCacheManager,
+ mock(FileCache.class),
+ new TestingTaskManagerRuntimeInfo(taskManagerConfig),
+ taskMetricGroup,
+ consumableNotifier,
+ partitionProducerStateChecker,
+ executor);
+ }
+
+ public static void setTaskState(Task task, ExecutionState state) {
+ try {
+ Field f = Task.class.getDeclaredField("executionState");
+ f.setAccessible(true);
+ f.set(task, state);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Modifying the task state failed", e);
+ }
+ }
+}