You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/27 18:30:56 UTC

[flink] branch release-1.9 updated (bbb4cf2 -> bb61cbe)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from bbb4cf2  [FLINK-13391][table] Fix InputFormatTableSource#getDataStream should not call getReturnType.
     new 2bed8ac  [hotfix][tests] Pull TestTaskBuilder out of TaskTest
     new c798b19  [hotfix][tests] Make NoOpPartitionProducerStateChecker a public reusable test implementation.
     new 891f4c5  [hotfix][tests] Reduce some mocking in TestTaskBuilder
     new bb61cbe  [FLINK-13325][test] Add test for FLINK-13249

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../partition/consumer/SingleInputGate.java        |   3 +-
 .../partition/consumer/RemoteInputChannelTest.java |  86 ++++++++
 .../partition/consumer/SingleInputGateBuilder.java |   9 +-
 .../NoOpPartitionProducerStateChecker.java}        |  22 +-
 .../apache/flink/runtime/taskmanager/TaskTest.java | 234 +--------------------
 .../flink/runtime/taskmanager/TestTaskBuilder.java | 221 +++++++++++++++++++
 .../runtime/util/JvmExitOnFatalErrorTest.java      |  15 +-
 7 files changed, 334 insertions(+), 256 deletions(-)
 copy flink-runtime/src/{main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java => test/java/org/apache/flink/runtime/taskexecutor/NoOpPartitionProducerStateChecker.java} (63%)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java


[flink] 02/04: [hotfix][tests] Make NoOpPartitionProducerStateChecker a public reusable test implementation.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c798b194e7418bfeee429d8004ce9a9f93f63c80
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Jul 26 19:12:04 2019 +0200

    [hotfix][tests] Make NoOpPartitionProducerStateChecker a public reusable test implementation.
---
 .../NoOpPartitionProducerStateChecker.java         | 39 ++++++++++++++++++++++
 .../runtime/util/JvmExitOnFatalErrorTest.java      | 15 +--------
 2 files changed, 40 insertions(+), 14 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NoOpPartitionProducerStateChecker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NoOpPartitionProducerStateChecker.java
new file mode 100644
index 0000000..f0ab5d2
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NoOpPartitionProducerStateChecker.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A No-Op implementation of the {@link PartitionProducerStateChecker}.
+ */
+public class NoOpPartitionProducerStateChecker implements PartitionProducerStateChecker {
+
+	@Override
+	public CompletableFuture<ExecutionState> requestPartitionProducerState(
+			JobID jobId, IntermediateDataSetID intermediateDataSetId, ResultPartitionID r) {
+
+		return null;
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index ba671fd..02ca74d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -46,10 +45,8 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
+import org.apache.flink.runtime.taskexecutor.NoOpPartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -77,7 +74,6 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.util.Collections;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 
@@ -268,14 +264,5 @@ public class JvmExitOnFatalErrorTest {
 			@Override
 			public void declineCheckpoint(JobID j, ExecutionAttemptID e, long l, Throwable t) {}
 		}
-
-		private static final class NoOpPartitionProducerStateChecker implements PartitionProducerStateChecker {
-
-			@Override
-			public CompletableFuture<ExecutionState> requestPartitionProducerState(
-					JobID jobId, IntermediateDataSetID intermediateDataSetId, ResultPartitionID r) {
-				return null;
-			}
-		}
 	}
 }


[flink] 01/04: [hotfix][tests] Pull TestTaskBuilder out of TaskTest

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2bed8ac7a7ed5193e1c93a8f1fa20a1adde5133c
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Jul 26 19:02:34 2019 +0200

    [hotfix][tests] Pull TestTaskBuilder out of TaskTest
---
 .../apache/flink/runtime/taskmanager/TaskTest.java | 234 +-------------------
 .../flink/runtime/taskmanager/TestTaskBuilder.java | 240 +++++++++++++++++++++
 2 files changed, 249 insertions(+), 225 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 5879f52..167c988 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -19,19 +19,14 @@
 package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
-import org.apache.flink.runtime.blob.TransientBlobCache;
 import org.apache.flink.runtime.blob.VoidBlobStore;
-import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -41,42 +36,22 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.JobInformation;
-import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.filecache.FileCache;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
-import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
-import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteChannelStateChecker;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
-import org.apache.flink.runtime.state.TestTaskStateManager;
-import org.apache.flink.runtime.taskexecutor.KvStateService;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
-import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
-import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.WrappingRuntimeException;
 
@@ -89,7 +64,6 @@ import org.junit.rules.TemporaryFolder;
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.Collections;
@@ -97,7 +71,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -107,10 +80,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -148,6 +120,7 @@ public class TaskTest extends TestLogger {
 	public void testRegularExecution() throws Exception {
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
 		final Task task = createTaskBuilder()
+			.setInvokable(TestInvokableCorrect.class)
 			.setTaskManagerActions(taskManagerActions)
 			.build();
 
@@ -307,7 +280,7 @@ public class TaskTest extends TestLogger {
 		final PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
 
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder(shuffleEnvironment)
+		final Task task = new TestTaskBuilder(shuffleEnvironment)
 			.setTaskManagerActions(taskManagerActions)
 			.setConsumableNotifier(consumableNotifier)
 			.setPartitionProducerStateChecker(partitionProducerStateChecker)
@@ -608,7 +581,7 @@ public class TaskTest extends TestLogger {
 
 		int producingStateCounter = 0;
 		for (ExecutionState state : ExecutionState.values()) {
-			setState(task, initialTaskState);
+			TestTaskBuilder.setTaskState(task, initialTaskState);
 
 			if (checker.isProducerReadyOrAbortConsumption(task.new PartitionProducerStateResponseHandle(state, null))) {
 				producingStateCounter++;
@@ -651,7 +624,7 @@ public class TaskTest extends TestLogger {
 				.setPartitionProducerStateChecker(partitionChecker)
 				.setExecutor(Executors.directExecutor())
 				.build();
-			setState(task, ExecutionState.RUNNING);
+			TestTaskBuilder.setTaskState(task, ExecutionState.RUNNING);
 
 			final CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
 			when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
@@ -675,7 +648,7 @@ public class TaskTest extends TestLogger {
 				.setPartitionProducerStateChecker(partitionChecker)
 				.setExecutor(Executors.directExecutor())
 				.build();
-			setState(task, ExecutionState.RUNNING);
+			TestTaskBuilder.setTaskState(task, ExecutionState.RUNNING);
 
 			final CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
 			when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
@@ -1013,197 +986,8 @@ public class TaskTest extends TestLogger {
 	//  helper functions
 	// ------------------------------------------------------------------------
 
-	private void setInputGate(Task task, SingleInputGate inputGate) {
-		try {
-			Field f = Task.class.getDeclaredField("inputGates");
-			f.setAccessible(true);
-			f.set(task, new SingleInputGate[]{inputGate});
-
-			Map<IntermediateDataSetID, SingleInputGate> byId = new HashMap<>(1);
-			byId.put(inputGate.getConsumedResultId(), inputGate);
-
-			f = Task.class.getDeclaredField("inputGatesById");
-			f.setAccessible(true);
-			f.set(task, byId);
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Modifying the task state failed", e);
-		}
-	}
-
-	private void setState(Task task, ExecutionState state) {
-		try {
-			Field f = Task.class.getDeclaredField("executionState");
-			f.setAccessible(true);
-			f.set(task, state);
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Modifying the task state failed", e);
-		}
-	}
-
-	private TaskBuilder createTaskBuilder() {
-		return new TaskBuilder(shuffleEnvironment);
-	}
-
-	private static final class TaskBuilder {
-		private Class<? extends AbstractInvokable> invokable;
-		private TaskManagerActions taskManagerActions;
-		private LibraryCacheManager libraryCacheManager;
-		private ResultPartitionConsumableNotifier consumableNotifier;
-		private PartitionProducerStateChecker partitionProducerStateChecker;
-		private final ShuffleEnvironment<?, ?> shuffleEnvironment;
-		private KvStateService kvStateService;
-		private Executor executor;
-		private Configuration taskManagerConfig;
-		private ExecutionConfig executionConfig;
-		private Collection<PermanentBlobKey> requiredJarFileBlobKeys;
-		private Collection<ResultPartitionDeploymentDescriptor> resultPartitions = Collections.emptyList();
-		private Collection<InputGateDeploymentDescriptor> inputGates = Collections.emptyList();
-
-		{
-			invokable = TestInvokableCorrect.class;
-			taskManagerActions = mock(TaskManagerActions.class);
-
-			libraryCacheManager = mock(LibraryCacheManager.class);
-			when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
-
-			consumableNotifier = new NoOpResultPartitionConsumableNotifier();
-			partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
-
-			kvStateService = new KvStateService(new KvStateRegistry(), null, null);
-
-			executor = TestingUtils.defaultExecutor();
-
-			taskManagerConfig = new Configuration();
-			executionConfig = new ExecutionConfig();
-
-			requiredJarFileBlobKeys = Collections.emptyList();
-		}
-
-		private TaskBuilder(ShuffleEnvironment<?, ?> shuffleEnvironment) {
-			this.shuffleEnvironment = Preconditions.checkNotNull(shuffleEnvironment);
-		}
-
-		TaskBuilder setInvokable(Class<? extends AbstractInvokable> invokable) {
-			this.invokable = invokable;
-			return this;
-		}
-
-		TaskBuilder setTaskManagerActions(TaskManagerActions taskManagerActions) {
-			this.taskManagerActions = taskManagerActions;
-			return this;
-		}
-
-		TaskBuilder setLibraryCacheManager(LibraryCacheManager libraryCacheManager) {
-			this.libraryCacheManager = libraryCacheManager;
-			return this;
-		}
-
-		TaskBuilder setConsumableNotifier(ResultPartitionConsumableNotifier consumableNotifier) {
-			this.consumableNotifier = consumableNotifier;
-			return this;
-		}
-
-		TaskBuilder setPartitionProducerStateChecker(PartitionProducerStateChecker partitionProducerStateChecker) {
-			this.partitionProducerStateChecker = partitionProducerStateChecker;
-			return this;
-		}
-
-		TaskBuilder setKvStateService(KvStateService kvStateService) {
-			this.kvStateService = kvStateService;
-			return this;
-		}
-
-		TaskBuilder setExecutor(Executor executor) {
-			this.executor = executor;
-			return this;
-		}
-
-		TaskBuilder setTaskManagerConfig(Configuration taskManagerConfig) {
-			this.taskManagerConfig = taskManagerConfig;
-			return this;
-		}
-
-		TaskBuilder setExecutionConfig(ExecutionConfig executionConfig) {
-			this.executionConfig = executionConfig;
-			return this;
-		}
-
-		TaskBuilder setRequiredJarFileBlobKeys(Collection<PermanentBlobKey> requiredJarFileBlobKeys) {
-			this.requiredJarFileBlobKeys = requiredJarFileBlobKeys;
-			return this;
-		}
-
-		public TaskBuilder setResultPartitions(Collection<ResultPartitionDeploymentDescriptor> resultPartitions) {
-			this.resultPartitions = resultPartitions;
-			return this;
-		}
-
-		public TaskBuilder setInputGates(Collection<InputGateDeploymentDescriptor> inputGates) {
-			this.inputGates = inputGates;
-			return this;
-		}
-
-		private Task build() throws Exception {
-			final JobID jobId = new JobID();
-			final JobVertexID jobVertexId = new JobVertexID();
-			final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
-
-			final SerializedValue<ExecutionConfig> serializedExecutionConfig = new SerializedValue<>(executionConfig);
-
-			final JobInformation jobInformation = new JobInformation(
-				jobId,
-				"Test Job",
-				serializedExecutionConfig,
-				new Configuration(),
-				requiredJarFileBlobKeys,
-				Collections.emptyList());
-
-			final TaskInformation taskInformation = new TaskInformation(
-				jobVertexId,
-				"Test Task",
-				1,
-				1,
-				invokable.getName(),
-				new Configuration());
-
-			final BlobCacheService blobCacheService = new BlobCacheService(
-				mock(PermanentBlobCache.class),
-				mock(TransientBlobCache.class));
-
-			final TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
-
-			return new Task(
-				jobInformation,
-				taskInformation,
-				executionAttemptId,
-				new AllocationID(),
-				0,
-				0,
-				resultPartitions,
-				inputGates,
-				0,
-				mock(MemoryManager.class),
-				mock(IOManager.class),
-				shuffleEnvironment,
-				kvStateService,
-				mock(BroadcastVariableManager.class),
-				new TaskEventDispatcher(),
-				new TestTaskStateManager(),
-				taskManagerActions,
-				new MockInputSplitProvider(),
-				new TestCheckpointResponder(),
-				new TestGlobalAggregateManager(),
-				blobCacheService,
-				libraryCacheManager,
-				mock(FileCache.class),
-				new TestingTaskManagerRuntimeInfo(taskManagerConfig),
-				taskMetricGroup,
-				consumableNotifier,
-				partitionProducerStateChecker,
-				executor);
-		}
+	private TestTaskBuilder createTaskBuilder() {
+		return new TestTaskBuilder(shuffleEnvironment);
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
new file mode 100644
index 0000000..97cff7b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobCacheService;
+import org.apache.flink.runtime.blob.PermanentBlobCache;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobCache;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.taskexecutor.KvStateService;
+import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
+import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.Executor;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Util that helps building {@link Task} objects for testing.
+ */
+public final class TestTaskBuilder {
+	private Class<? extends AbstractInvokable> invokable;
+	private TaskManagerActions taskManagerActions;
+	private LibraryCacheManager libraryCacheManager;
+	private ResultPartitionConsumableNotifier consumableNotifier;
+	private PartitionProducerStateChecker partitionProducerStateChecker;
+	private final ShuffleEnvironment<?, ?> shuffleEnvironment;
+	private KvStateService kvStateService;
+	private Executor executor;
+	private Configuration taskManagerConfig;
+	private ExecutionConfig executionConfig;
+	private Collection<PermanentBlobKey> requiredJarFileBlobKeys;
+	private Collection<ResultPartitionDeploymentDescriptor> resultPartitions = Collections.emptyList();
+	private Collection<InputGateDeploymentDescriptor> inputGates = Collections.emptyList();
+
+	{
+		invokable = AbstractInvokable.class;
+		taskManagerActions = mock(TaskManagerActions.class);
+
+		libraryCacheManager = mock(LibraryCacheManager.class);
+		when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
+
+		consumableNotifier = new NoOpResultPartitionConsumableNotifier();
+		partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
+
+		kvStateService = new KvStateService(new KvStateRegistry(), null, null);
+
+		executor = TestingUtils.defaultExecutor();
+
+		taskManagerConfig = new Configuration();
+		executionConfig = new ExecutionConfig();
+
+		requiredJarFileBlobKeys = Collections.emptyList();
+	}
+
+	public TestTaskBuilder(ShuffleEnvironment<?, ?> shuffleEnvironment) {
+		this.shuffleEnvironment = Preconditions.checkNotNull(shuffleEnvironment);
+	}
+
+	public TestTaskBuilder setInvokable(Class<? extends AbstractInvokable> invokable) {
+		this.invokable = invokable;
+		return this;
+	}
+
+	public TestTaskBuilder setTaskManagerActions(TaskManagerActions taskManagerActions) {
+		this.taskManagerActions = taskManagerActions;
+		return this;
+	}
+
+	public TestTaskBuilder setLibraryCacheManager(LibraryCacheManager libraryCacheManager) {
+		this.libraryCacheManager = libraryCacheManager;
+		return this;
+	}
+
+	public TestTaskBuilder setConsumableNotifier(ResultPartitionConsumableNotifier consumableNotifier) {
+		this.consumableNotifier = consumableNotifier;
+		return this;
+	}
+
+	public TestTaskBuilder setPartitionProducerStateChecker(PartitionProducerStateChecker partitionProducerStateChecker) {
+		this.partitionProducerStateChecker = partitionProducerStateChecker;
+		return this;
+	}
+
+	public TestTaskBuilder setKvStateService(KvStateService kvStateService) {
+		this.kvStateService = kvStateService;
+		return this;
+	}
+
+	public TestTaskBuilder setExecutor(Executor executor) {
+		this.executor = executor;
+		return this;
+	}
+
+	public TestTaskBuilder setTaskManagerConfig(Configuration taskManagerConfig) {
+		this.taskManagerConfig = taskManagerConfig;
+		return this;
+	}
+
+	public TestTaskBuilder setExecutionConfig(ExecutionConfig executionConfig) {
+		this.executionConfig = executionConfig;
+		return this;
+	}
+
+	public TestTaskBuilder setRequiredJarFileBlobKeys(Collection<PermanentBlobKey> requiredJarFileBlobKeys) {
+		this.requiredJarFileBlobKeys = requiredJarFileBlobKeys;
+		return this;
+	}
+
+	public TestTaskBuilder setResultPartitions(Collection<ResultPartitionDeploymentDescriptor> resultPartitions) {
+		this.resultPartitions = resultPartitions;
+		return this;
+	}
+
+	public TestTaskBuilder setInputGates(Collection<InputGateDeploymentDescriptor> inputGates) {
+		this.inputGates = inputGates;
+		return this;
+	}
+
+	public Task build() throws Exception {
+		final JobID jobId = new JobID();
+		final JobVertexID jobVertexId = new JobVertexID();
+		final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+
+		final SerializedValue<ExecutionConfig> serializedExecutionConfig = new SerializedValue<>(executionConfig);
+
+		final JobInformation jobInformation = new JobInformation(
+			jobId,
+			"Test Job",
+			serializedExecutionConfig,
+			new Configuration(),
+			requiredJarFileBlobKeys,
+			Collections.emptyList());
+
+		final TaskInformation taskInformation = new TaskInformation(
+			jobVertexId,
+			"Test Task",
+			1,
+			1,
+			invokable.getName(),
+			new Configuration());
+
+		final BlobCacheService blobCacheService = new BlobCacheService(
+			mock(PermanentBlobCache.class),
+			mock(TransientBlobCache.class));
+
+		final TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+		return new Task(
+			jobInformation,
+			taskInformation,
+			executionAttemptId,
+			new AllocationID(),
+			0,
+			0,
+			resultPartitions,
+			inputGates,
+			0,
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			shuffleEnvironment,
+			kvStateService,
+			mock(BroadcastVariableManager.class),
+			new TaskEventDispatcher(),
+			new TestTaskStateManager(),
+			taskManagerActions,
+			new MockInputSplitProvider(),
+			new TestCheckpointResponder(),
+			new TestGlobalAggregateManager(),
+			blobCacheService,
+			libraryCacheManager,
+			mock(FileCache.class),
+			new TestingTaskManagerRuntimeInfo(taskManagerConfig),
+			taskMetricGroup,
+			consumableNotifier,
+			partitionProducerStateChecker,
+			executor);
+	}
+
+	public static void setTaskState(Task task, ExecutionState state) {
+		try {
+			Field f = Task.class.getDeclaredField("executionState");
+			f.setAccessible(true);
+			f.set(task, state);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Modifying the task state failed", e);
+		}
+	}
+}


[flink] 04/04: [FLINK-13325][test] Add test for FLINK-13249

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bb61cbefb41d31d34c4c6d7a260418bc56994db8
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Jul 26 19:16:48 2019 +0200

    [FLINK-13325][test] Add test for FLINK-13249
    
    FLINK-13249 was a bug where a deadlock occurred when the network thread got blocked on a lock
    while requesting partitions to be read by remote channels. The test mimicks that situation
    to guard the fix applied in an earlier commit.
---
 .../partition/consumer/SingleInputGate.java        |  3 +-
 .../partition/consumer/RemoteInputChannelTest.java | 86 ++++++++++++++++++++++
 .../partition/consumer/SingleInputGateBuilder.java |  9 ++-
 3 files changed, 96 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 534078d9..fd40c94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -215,7 +215,8 @@ public class SingleInputGate extends InputGate {
 		requestPartitions();
 	}
 
-	private void requestPartitions() throws IOException, InterruptedException {
+	@VisibleForTesting
+	void requestPartitions() throws IOException, InterruptedException {
 		synchronized (requestLock) {
 			if (!requestedPartitionsFlag) {
 				if (closeFuture.isDone()) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 0fdebf0..c3b2fd1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -19,11 +19,16 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.TestingPartitionRequestClient;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferListener.NotificationResult;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -33,10 +38,14 @@ import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TestTaskBuilder;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
@@ -46,9 +55,13 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -1082,6 +1095,79 @@ public class RemoteInputChannelTest {
 	}
 
 	/**
+	 * Test to guard against FLINK-13249.
+	 */
+	@Test
+	public void testOnFailedPartitionRequestDoesNotBlockNetworkThreads() throws Exception {
+
+		final long testBlockedWaitTimeoutMillis = 30_000L;
+
+		final PartitionProducerStateChecker partitionProducerStateChecker =
+			(jobId, intermediateDataSetId, resultPartitionId) -> CompletableFuture.completedFuture(ExecutionState.RUNNING);
+		final NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
+		final Task task = new TestTaskBuilder(shuffleEnvironment)
+			.setPartitionProducerStateChecker(partitionProducerStateChecker)
+			.build();
+		final SingleInputGate inputGate = new SingleInputGateBuilder()
+			.setPartitionProducerStateProvider(task)
+			.build();
+
+		TestTaskBuilder.setTaskState(task, ExecutionState.RUNNING);
+
+		final OneShotLatch ready = new OneShotLatch();
+		final OneShotLatch blocker = new OneShotLatch();
+		final AtomicBoolean timedOutOrInterrupted = new AtomicBoolean(false);
+
+		final ConnectionManager blockingConnectionManager = new TestingConnectionManager() {
+
+			@Override
+			public PartitionRequestClient createPartitionRequestClient(
+				ConnectionID connectionId) {
+				ready.trigger();
+				try {
+					// We block here, in a section that holds the SingleInputGate#requestLock
+					blocker.await(testBlockedWaitTimeoutMillis, TimeUnit.MILLISECONDS);
+				} catch (InterruptedException | TimeoutException e) {
+					timedOutOrInterrupted.set(true);
+				}
+
+				return new TestingPartitionRequestClient();
+			}
+		};
+
+		final RemoteInputChannel remoteInputChannel =
+			InputChannelBuilder.newBuilder()
+				.setConnectionManager(blockingConnectionManager)
+				.buildRemoteAndSetToGate(inputGate);
+
+		final Thread simulatedNetworkThread = new Thread(
+			() -> {
+				try {
+					ready.await();
+					// We want to make sure that our simulated network thread does not block on
+					// SingleInputGate#requestLock as well through this call.
+					remoteInputChannel.onFailedPartitionRequest();
+
+					// Will only give free the blocker if we did not block ourselves.
+					blocker.trigger();
+				} catch (InterruptedException e) {
+					Thread.currentThread().interrupt();
+				}
+			});
+
+		simulatedNetworkThread.start();
+
+		// The entry point to that will lead us into blockingConnectionManager#createPartitionRequestClient(...).
+		inputGate.requestPartitions();
+
+		simulatedNetworkThread.join();
+
+		Assert.assertFalse(
+			"Test ended by timeout or interruption - this indicates that the network thread was blocked.",
+			timedOutOrInterrupted.get());
+	}
+
+	/**
 	 * Requests the exclusive buffers from input channel first and then recycles them by a callable task.
 	 *
 	 * @param inputChannel The input channel that exclusive buffers request from.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index 944cc07..6fa5433 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -43,7 +43,7 @@ public class SingleInputGateBuilder {
 
 	private int numberOfChannels = 1;
 
-	private final PartitionProducerStateProvider partitionProducerStateProvider = NO_OP_PRODUCER_CHECKER;
+	private PartitionProducerStateProvider partitionProducerStateProvider = NO_OP_PRODUCER_CHECKER;
 
 	private boolean isCreditBased = true;
 
@@ -51,6 +51,13 @@ public class SingleInputGateBuilder {
 		throw new UnsupportedOperationException();
 	};
 
+	public SingleInputGateBuilder setPartitionProducerStateProvider(
+		PartitionProducerStateProvider partitionProducerStateProvider) {
+
+		this.partitionProducerStateProvider = partitionProducerStateProvider;
+		return this;
+	}
+
 	public SingleInputGateBuilder setResultPartitionType(ResultPartitionType partitionType) {
 		this.partitionType = partitionType;
 		return this;


[flink] 03/04: [hotfix][tests] Reduce some mocking in TestTaskBuilder

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 891f4c59e06297e82629cb71d2ca929ed52783c8
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Jul 26 19:16:18 2019 +0200

    [hotfix][tests] Reduce some mocking in TestTaskBuilder
---
 .../flink/runtime/taskmanager/TestTaskBuilder.java | 49 +++++++---------------
 1 file changed, 15 insertions(+), 34 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
index 97cff7b..ec1204f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.ContextClassLoaderLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
@@ -49,6 +50,7 @@ import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.taskexecutor.KvStateService;
+import org.apache.flink.runtime.taskexecutor.NoOpPartitionProducerStateChecker;
 import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
 import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -61,48 +63,27 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.Executor;
 
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Util that helps building {@link Task} objects for testing.
  */
 public final class TestTaskBuilder {
-	private Class<? extends AbstractInvokable> invokable;
-	private TaskManagerActions taskManagerActions;
-	private LibraryCacheManager libraryCacheManager;
-	private ResultPartitionConsumableNotifier consumableNotifier;
-	private PartitionProducerStateChecker partitionProducerStateChecker;
+
+	private Class<? extends AbstractInvokable> invokable = AbstractInvokable.class;
+	private TaskManagerActions taskManagerActions = new NoOpTaskManagerActions();
+	private LibraryCacheManager libraryCacheManager = ContextClassLoaderLibraryCacheManager.INSTANCE;
+	private ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
+	private PartitionProducerStateChecker partitionProducerStateChecker = new NoOpPartitionProducerStateChecker();
 	private final ShuffleEnvironment<?, ?> shuffleEnvironment;
-	private KvStateService kvStateService;
-	private Executor executor;
-	private Configuration taskManagerConfig;
-	private ExecutionConfig executionConfig;
-	private Collection<PermanentBlobKey> requiredJarFileBlobKeys;
+	private KvStateService kvStateService = new KvStateService(new KvStateRegistry(), null, null);
+	private Executor executor = TestingUtils.defaultExecutor();
+	private Configuration taskManagerConfig = new Configuration();
+	private ExecutionConfig executionConfig = new ExecutionConfig();
+	private Collection<PermanentBlobKey> requiredJarFileBlobKeys = Collections.emptyList();
 	private Collection<ResultPartitionDeploymentDescriptor> resultPartitions = Collections.emptyList();
 	private Collection<InputGateDeploymentDescriptor> inputGates = Collections.emptyList();
 
-	{
-		invokable = AbstractInvokable.class;
-		taskManagerActions = mock(TaskManagerActions.class);
-
-		libraryCacheManager = mock(LibraryCacheManager.class);
-		when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
-
-		consumableNotifier = new NoOpResultPartitionConsumableNotifier();
-		partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
-
-		kvStateService = new KvStateService(new KvStateRegistry(), null, null);
-
-		executor = TestingUtils.defaultExecutor();
-
-		taskManagerConfig = new Configuration();
-		executionConfig = new ExecutionConfig();
-
-		requiredJarFileBlobKeys = Collections.emptyList();
-	}
-
 	public TestTaskBuilder(ShuffleEnvironment<?, ?> shuffleEnvironment) {
 		this.shuffleEnvironment = Preconditions.checkNotNull(shuffleEnvironment);
 	}
@@ -206,11 +187,11 @@ public final class TestTaskBuilder {
 			resultPartitions,
 			inputGates,
 			0,
-			mock(MemoryManager.class),
+			new MemoryManager(1024 * 1024, 1),
 			mock(IOManager.class),
 			shuffleEnvironment,
 			kvStateService,
-			mock(BroadcastVariableManager.class),
+			new BroadcastVariableManager(),
 			new TaskEventDispatcher(),
 			new TestTaskStateManager(),
 			taskManagerActions,