You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/27 18:30:05 UTC

[flink] branch master updated (bb82aac -> 21621fb)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from bb82aac  [FLINK-12590][docs] Use HTTPS for all Flink links
     new 4ccd335  [hotfix][tests] Pull TestTaskBuilder out of TaskTest
     new 2107a04  [hotfix][tests] Make NoOpPartitionProducerStateChecker a public reusable test implementation.
     new 816fd52  [hotfix][tests] Reduce some mocking in TestTaskBuilder
     new 21621fb  [FLINK-13325][test] Add test for FLINK-13249

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../partition/consumer/SingleInputGate.java        |   3 +-
 .../partition/consumer/RemoteInputChannelTest.java |  86 ++++++++
 .../partition/consumer/SingleInputGateBuilder.java |   9 +-
 .../NoOpPartitionProducerStateChecker.java}        |  22 +-
 .../apache/flink/runtime/taskmanager/TaskTest.java | 234 +--------------------
 .../flink/runtime/taskmanager/TestTaskBuilder.java | 221 +++++++++++++++++++
 .../runtime/util/JvmExitOnFatalErrorTest.java      |  15 +-
 7 files changed, 334 insertions(+), 256 deletions(-)
 copy flink-runtime/src/{main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java => test/java/org/apache/flink/runtime/taskexecutor/NoOpPartitionProducerStateChecker.java} (63%)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java


[flink] 02/04: [hotfix][tests] Make NoOpPartitionProducerStateChecker a public reusable test implementation.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2107a044e626481adc357d636daa3777249dcd9a
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Jul 26 19:12:04 2019 +0200

    [hotfix][tests] Make NoOpPartitionProducerStateChecker a public reusable test implementation.
---
 .../NoOpPartitionProducerStateChecker.java         | 39 ++++++++++++++++++++++
 .../runtime/util/JvmExitOnFatalErrorTest.java      | 15 +--------
 2 files changed, 40 insertions(+), 14 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NoOpPartitionProducerStateChecker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NoOpPartitionProducerStateChecker.java
new file mode 100644
index 0000000..f0ab5d2
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NoOpPartitionProducerStateChecker.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A No-Op implementation of the {@link PartitionProducerStateChecker}.
+ */
+public class NoOpPartitionProducerStateChecker implements PartitionProducerStateChecker {
+
+	@Override
+	public CompletableFuture<ExecutionState> requestPartitionProducerState(
+			JobID jobId, IntermediateDataSetID intermediateDataSetId, ResultPartitionID r) {
+
+		return null;
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index ba671fd..02ca74d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -46,10 +45,8 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
+import org.apache.flink.runtime.taskexecutor.NoOpPartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -77,7 +74,6 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.util.Collections;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 
@@ -268,14 +264,5 @@ public class JvmExitOnFatalErrorTest {
 			@Override
 			public void declineCheckpoint(JobID j, ExecutionAttemptID e, long l, Throwable t) {}
 		}
-
-		private static final class NoOpPartitionProducerStateChecker implements PartitionProducerStateChecker {
-
-			@Override
-			public CompletableFuture<ExecutionState> requestPartitionProducerState(
-					JobID jobId, IntermediateDataSetID intermediateDataSetId, ResultPartitionID r) {
-				return null;
-			}
-		}
 	}
 }


[flink] 04/04: [FLINK-13325][test] Add test for FLINK-13249

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 21621fbcde534969b748f21e9f8983e3f4e0fb1d
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Jul 26 19:16:48 2019 +0200

    [FLINK-13325][test] Add test for FLINK-13249
    
    FLINK-13249 was a bug where a deadlock occurred when the network thread got blocked on a lock
    while requesting partitions to be read by remote channels. The test mimicks that situation
    to guard the fix applied in an earlier commit.
---
 .../partition/consumer/SingleInputGate.java        |  3 +-
 .../partition/consumer/RemoteInputChannelTest.java | 86 ++++++++++++++++++++++
 .../partition/consumer/SingleInputGateBuilder.java |  9 ++-
 3 files changed, 96 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 534078d9..fd40c94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -215,7 +215,8 @@ public class SingleInputGate extends InputGate {
 		requestPartitions();
 	}
 
-	private void requestPartitions() throws IOException, InterruptedException {
+	@VisibleForTesting
+	void requestPartitions() throws IOException, InterruptedException {
 		synchronized (requestLock) {
 			if (!requestedPartitionsFlag) {
 				if (closeFuture.isDone()) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 0fdebf0..c3b2fd1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -19,11 +19,16 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.TestingPartitionRequestClient;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferListener.NotificationResult;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -33,10 +38,14 @@ import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TestTaskBuilder;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
@@ -46,9 +55,13 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -1082,6 +1095,79 @@ public class RemoteInputChannelTest {
 	}
 
 	/**
+	 * Test to guard against FLINK-13249.
+	 */
+	@Test
+	public void testOnFailedPartitionRequestDoesNotBlockNetworkThreads() throws Exception {
+
+		final long testBlockedWaitTimeoutMillis = 30_000L;
+
+		final PartitionProducerStateChecker partitionProducerStateChecker =
+			(jobId, intermediateDataSetId, resultPartitionId) -> CompletableFuture.completedFuture(ExecutionState.RUNNING);
+		final NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
+		final Task task = new TestTaskBuilder(shuffleEnvironment)
+			.setPartitionProducerStateChecker(partitionProducerStateChecker)
+			.build();
+		final SingleInputGate inputGate = new SingleInputGateBuilder()
+			.setPartitionProducerStateProvider(task)
+			.build();
+
+		TestTaskBuilder.setTaskState(task, ExecutionState.RUNNING);
+
+		final OneShotLatch ready = new OneShotLatch();
+		final OneShotLatch blocker = new OneShotLatch();
+		final AtomicBoolean timedOutOrInterrupted = new AtomicBoolean(false);
+
+		final ConnectionManager blockingConnectionManager = new TestingConnectionManager() {
+
+			@Override
+			public PartitionRequestClient createPartitionRequestClient(
+				ConnectionID connectionId) {
+				ready.trigger();
+				try {
+					// We block here, in a section that holds the SingleInputGate#requestLock
+					blocker.await(testBlockedWaitTimeoutMillis, TimeUnit.MILLISECONDS);
+				} catch (InterruptedException | TimeoutException e) {
+					timedOutOrInterrupted.set(true);
+				}
+
+				return new TestingPartitionRequestClient();
+			}
+		};
+
+		final RemoteInputChannel remoteInputChannel =
+			InputChannelBuilder.newBuilder()
+				.setConnectionManager(blockingConnectionManager)
+				.buildRemoteAndSetToGate(inputGate);
+
+		final Thread simulatedNetworkThread = new Thread(
+			() -> {
+				try {
+					ready.await();
+					// We want to make sure that our simulated network thread does not block on
+					// SingleInputGate#requestLock as well through this call.
+					remoteInputChannel.onFailedPartitionRequest();
+
+					// Will only give free the blocker if we did not block ourselves.
+					blocker.trigger();
+				} catch (InterruptedException e) {
+					Thread.currentThread().interrupt();
+				}
+			});
+
+		simulatedNetworkThread.start();
+
+		// The entry point to that will lead us into blockingConnectionManager#createPartitionRequestClient(...).
+		inputGate.requestPartitions();
+
+		simulatedNetworkThread.join();
+
+		Assert.assertFalse(
+			"Test ended by timeout or interruption - this indicates that the network thread was blocked.",
+			timedOutOrInterrupted.get());
+	}
+
+	/**
 	 * Requests the exclusive buffers from input channel first and then recycles them by a callable task.
 	 *
 	 * @param inputChannel The input channel that exclusive buffers request from.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index 944cc07..6fa5433 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -43,7 +43,7 @@ public class SingleInputGateBuilder {
 
 	private int numberOfChannels = 1;
 
-	private final PartitionProducerStateProvider partitionProducerStateProvider = NO_OP_PRODUCER_CHECKER;
+	private PartitionProducerStateProvider partitionProducerStateProvider = NO_OP_PRODUCER_CHECKER;
 
 	private boolean isCreditBased = true;
 
@@ -51,6 +51,13 @@ public class SingleInputGateBuilder {
 		throw new UnsupportedOperationException();
 	};
 
+	public SingleInputGateBuilder setPartitionProducerStateProvider(
+		PartitionProducerStateProvider partitionProducerStateProvider) {
+
+		this.partitionProducerStateProvider = partitionProducerStateProvider;
+		return this;
+	}
+
 	public SingleInputGateBuilder setResultPartitionType(ResultPartitionType partitionType) {
 		this.partitionType = partitionType;
 		return this;


[flink] 01/04: [hotfix][tests] Pull TestTaskBuilder out of TaskTest

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4ccd335175a1ffa18a6e4cb9f1067f37ec7535ff
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Jul 26 19:02:34 2019 +0200

    [hotfix][tests] Pull TestTaskBuilder out of TaskTest
---
 .../apache/flink/runtime/taskmanager/TaskTest.java | 234 +-------------------
 .../flink/runtime/taskmanager/TestTaskBuilder.java | 240 +++++++++++++++++++++
 2 files changed, 249 insertions(+), 225 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 5879f52..167c988 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -19,19 +19,14 @@
 package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
-import org.apache.flink.runtime.blob.TransientBlobCache;
 import org.apache.flink.runtime.blob.VoidBlobStore;
-import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -41,42 +36,22 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.JobInformation;
-import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.filecache.FileCache;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
-import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
-import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteChannelStateChecker;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
-import org.apache.flink.runtime.state.TestTaskStateManager;
-import org.apache.flink.runtime.taskexecutor.KvStateService;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
-import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
-import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.WrappingRuntimeException;
 
@@ -89,7 +64,6 @@ import org.junit.rules.TemporaryFolder;
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.Collections;
@@ -97,7 +71,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -107,10 +80,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -148,6 +120,7 @@ public class TaskTest extends TestLogger {
 	public void testRegularExecution() throws Exception {
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
 		final Task task = createTaskBuilder()
+			.setInvokable(TestInvokableCorrect.class)
 			.setTaskManagerActions(taskManagerActions)
 			.build();
 
@@ -307,7 +280,7 @@ public class TaskTest extends TestLogger {
 		final PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
 
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder(shuffleEnvironment)
+		final Task task = new TestTaskBuilder(shuffleEnvironment)
 			.setTaskManagerActions(taskManagerActions)
 			.setConsumableNotifier(consumableNotifier)
 			.setPartitionProducerStateChecker(partitionProducerStateChecker)
@@ -608,7 +581,7 @@ public class TaskTest extends TestLogger {
 
 		int producingStateCounter = 0;
 		for (ExecutionState state : ExecutionState.values()) {
-			setState(task, initialTaskState);
+			TestTaskBuilder.setTaskState(task, initialTaskState);
 
 			if (checker.isProducerReadyOrAbortConsumption(task.new PartitionProducerStateResponseHandle(state, null))) {
 				producingStateCounter++;
@@ -651,7 +624,7 @@ public class TaskTest extends TestLogger {
 				.setPartitionProducerStateChecker(partitionChecker)
 				.setExecutor(Executors.directExecutor())
 				.build();
-			setState(task, ExecutionState.RUNNING);
+			TestTaskBuilder.setTaskState(task, ExecutionState.RUNNING);
 
 			final CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
 			when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
@@ -675,7 +648,7 @@ public class TaskTest extends TestLogger {
 				.setPartitionProducerStateChecker(partitionChecker)
 				.setExecutor(Executors.directExecutor())
 				.build();
-			setState(task, ExecutionState.RUNNING);
+			TestTaskBuilder.setTaskState(task, ExecutionState.RUNNING);
 
 			final CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
 			when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
@@ -1013,197 +986,8 @@ public class TaskTest extends TestLogger {
 	//  helper functions
 	// ------------------------------------------------------------------------
 
-	private void setInputGate(Task task, SingleInputGate inputGate) {
-		try {
-			Field f = Task.class.getDeclaredField("inputGates");
-			f.setAccessible(true);
-			f.set(task, new SingleInputGate[]{inputGate});
-
-			Map<IntermediateDataSetID, SingleInputGate> byId = new HashMap<>(1);
-			byId.put(inputGate.getConsumedResultId(), inputGate);
-
-			f = Task.class.getDeclaredField("inputGatesById");
-			f.setAccessible(true);
-			f.set(task, byId);
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Modifying the task state failed", e);
-		}
-	}
-
-	private void setState(Task task, ExecutionState state) {
-		try {
-			Field f = Task.class.getDeclaredField("executionState");
-			f.setAccessible(true);
-			f.set(task, state);
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Modifying the task state failed", e);
-		}
-	}
-
-	private TaskBuilder createTaskBuilder() {
-		return new TaskBuilder(shuffleEnvironment);
-	}
-
-	private static final class TaskBuilder {
-		private Class<? extends AbstractInvokable> invokable;
-		private TaskManagerActions taskManagerActions;
-		private LibraryCacheManager libraryCacheManager;
-		private ResultPartitionConsumableNotifier consumableNotifier;
-		private PartitionProducerStateChecker partitionProducerStateChecker;
-		private final ShuffleEnvironment<?, ?> shuffleEnvironment;
-		private KvStateService kvStateService;
-		private Executor executor;
-		private Configuration taskManagerConfig;
-		private ExecutionConfig executionConfig;
-		private Collection<PermanentBlobKey> requiredJarFileBlobKeys;
-		private Collection<ResultPartitionDeploymentDescriptor> resultPartitions = Collections.emptyList();
-		private Collection<InputGateDeploymentDescriptor> inputGates = Collections.emptyList();
-
-		{
-			invokable = TestInvokableCorrect.class;
-			taskManagerActions = mock(TaskManagerActions.class);
-
-			libraryCacheManager = mock(LibraryCacheManager.class);
-			when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
-
-			consumableNotifier = new NoOpResultPartitionConsumableNotifier();
-			partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
-
-			kvStateService = new KvStateService(new KvStateRegistry(), null, null);
-
-			executor = TestingUtils.defaultExecutor();
-
-			taskManagerConfig = new Configuration();
-			executionConfig = new ExecutionConfig();
-
-			requiredJarFileBlobKeys = Collections.emptyList();
-		}
-
-		private TaskBuilder(ShuffleEnvironment<?, ?> shuffleEnvironment) {
-			this.shuffleEnvironment = Preconditions.checkNotNull(shuffleEnvironment);
-		}
-
-		TaskBuilder setInvokable(Class<? extends AbstractInvokable> invokable) {
-			this.invokable = invokable;
-			return this;
-		}
-
-		TaskBuilder setTaskManagerActions(TaskManagerActions taskManagerActions) {
-			this.taskManagerActions = taskManagerActions;
-			return this;
-		}
-
-		TaskBuilder setLibraryCacheManager(LibraryCacheManager libraryCacheManager) {
-			this.libraryCacheManager = libraryCacheManager;
-			return this;
-		}
-
-		TaskBuilder setConsumableNotifier(ResultPartitionConsumableNotifier consumableNotifier) {
-			this.consumableNotifier = consumableNotifier;
-			return this;
-		}
-
-		TaskBuilder setPartitionProducerStateChecker(PartitionProducerStateChecker partitionProducerStateChecker) {
-			this.partitionProducerStateChecker = partitionProducerStateChecker;
-			return this;
-		}
-
-		TaskBuilder setKvStateService(KvStateService kvStateService) {
-			this.kvStateService = kvStateService;
-			return this;
-		}
-
-		TaskBuilder setExecutor(Executor executor) {
-			this.executor = executor;
-			return this;
-		}
-
-		TaskBuilder setTaskManagerConfig(Configuration taskManagerConfig) {
-			this.taskManagerConfig = taskManagerConfig;
-			return this;
-		}
-
-		TaskBuilder setExecutionConfig(ExecutionConfig executionConfig) {
-			this.executionConfig = executionConfig;
-			return this;
-		}
-
-		TaskBuilder setRequiredJarFileBlobKeys(Collection<PermanentBlobKey> requiredJarFileBlobKeys) {
-			this.requiredJarFileBlobKeys = requiredJarFileBlobKeys;
-			return this;
-		}
-
-		public TaskBuilder setResultPartitions(Collection<ResultPartitionDeploymentDescriptor> resultPartitions) {
-			this.resultPartitions = resultPartitions;
-			return this;
-		}
-
-		public TaskBuilder setInputGates(Collection<InputGateDeploymentDescriptor> inputGates) {
-			this.inputGates = inputGates;
-			return this;
-		}
-
-		private Task build() throws Exception {
-			final JobID jobId = new JobID();
-			final JobVertexID jobVertexId = new JobVertexID();
-			final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
-
-			final SerializedValue<ExecutionConfig> serializedExecutionConfig = new SerializedValue<>(executionConfig);
-
-			final JobInformation jobInformation = new JobInformation(
-				jobId,
-				"Test Job",
-				serializedExecutionConfig,
-				new Configuration(),
-				requiredJarFileBlobKeys,
-				Collections.emptyList());
-
-			final TaskInformation taskInformation = new TaskInformation(
-				jobVertexId,
-				"Test Task",
-				1,
-				1,
-				invokable.getName(),
-				new Configuration());
-
-			final BlobCacheService blobCacheService = new BlobCacheService(
-				mock(PermanentBlobCache.class),
-				mock(TransientBlobCache.class));
-
-			final TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
-
-			return new Task(
-				jobInformation,
-				taskInformation,
-				executionAttemptId,
-				new AllocationID(),
-				0,
-				0,
-				resultPartitions,
-				inputGates,
-				0,
-				mock(MemoryManager.class),
-				mock(IOManager.class),
-				shuffleEnvironment,
-				kvStateService,
-				mock(BroadcastVariableManager.class),
-				new TaskEventDispatcher(),
-				new TestTaskStateManager(),
-				taskManagerActions,
-				new MockInputSplitProvider(),
-				new TestCheckpointResponder(),
-				new TestGlobalAggregateManager(),
-				blobCacheService,
-				libraryCacheManager,
-				mock(FileCache.class),
-				new TestingTaskManagerRuntimeInfo(taskManagerConfig),
-				taskMetricGroup,
-				consumableNotifier,
-				partitionProducerStateChecker,
-				executor);
-		}
+	private TestTaskBuilder createTaskBuilder() {
+		return new TestTaskBuilder(shuffleEnvironment);
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
new file mode 100644
index 0000000..97cff7b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobCacheService;
+import org.apache.flink.runtime.blob.PermanentBlobCache;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobCache;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.taskexecutor.KvStateService;
+import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
+import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.Executor;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Util that helps building {@link Task} objects for testing.
+ */
+public final class TestTaskBuilder {
+	private Class<? extends AbstractInvokable> invokable;
+	private TaskManagerActions taskManagerActions;
+	private LibraryCacheManager libraryCacheManager;
+	private ResultPartitionConsumableNotifier consumableNotifier;
+	private PartitionProducerStateChecker partitionProducerStateChecker;
+	private final ShuffleEnvironment<?, ?> shuffleEnvironment;
+	private KvStateService kvStateService;
+	private Executor executor;
+	private Configuration taskManagerConfig;
+	private ExecutionConfig executionConfig;
+	private Collection<PermanentBlobKey> requiredJarFileBlobKeys;
+	private Collection<ResultPartitionDeploymentDescriptor> resultPartitions = Collections.emptyList();
+	private Collection<InputGateDeploymentDescriptor> inputGates = Collections.emptyList();
+
+	{
+		invokable = AbstractInvokable.class;
+		taskManagerActions = mock(TaskManagerActions.class);
+
+		libraryCacheManager = mock(LibraryCacheManager.class);
+		when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
+
+		consumableNotifier = new NoOpResultPartitionConsumableNotifier();
+		partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
+
+		kvStateService = new KvStateService(new KvStateRegistry(), null, null);
+
+		executor = TestingUtils.defaultExecutor();
+
+		taskManagerConfig = new Configuration();
+		executionConfig = new ExecutionConfig();
+
+		requiredJarFileBlobKeys = Collections.emptyList();
+	}
+
+	public TestTaskBuilder(ShuffleEnvironment<?, ?> shuffleEnvironment) {
+		this.shuffleEnvironment = Preconditions.checkNotNull(shuffleEnvironment);
+	}
+
+	public TestTaskBuilder setInvokable(Class<? extends AbstractInvokable> invokable) {
+		this.invokable = invokable;
+		return this;
+	}
+
+	public TestTaskBuilder setTaskManagerActions(TaskManagerActions taskManagerActions) {
+		this.taskManagerActions = taskManagerActions;
+		return this;
+	}
+
+	public TestTaskBuilder setLibraryCacheManager(LibraryCacheManager libraryCacheManager) {
+		this.libraryCacheManager = libraryCacheManager;
+		return this;
+	}
+
+	public TestTaskBuilder setConsumableNotifier(ResultPartitionConsumableNotifier consumableNotifier) {
+		this.consumableNotifier = consumableNotifier;
+		return this;
+	}
+
+	public TestTaskBuilder setPartitionProducerStateChecker(PartitionProducerStateChecker partitionProducerStateChecker) {
+		this.partitionProducerStateChecker = partitionProducerStateChecker;
+		return this;
+	}
+
+	public TestTaskBuilder setKvStateService(KvStateService kvStateService) {
+		this.kvStateService = kvStateService;
+		return this;
+	}
+
+	public TestTaskBuilder setExecutor(Executor executor) {
+		this.executor = executor;
+		return this;
+	}
+
+	public TestTaskBuilder setTaskManagerConfig(Configuration taskManagerConfig) {
+		this.taskManagerConfig = taskManagerConfig;
+		return this;
+	}
+
+	public TestTaskBuilder setExecutionConfig(ExecutionConfig executionConfig) {
+		this.executionConfig = executionConfig;
+		return this;
+	}
+
+	public TestTaskBuilder setRequiredJarFileBlobKeys(Collection<PermanentBlobKey> requiredJarFileBlobKeys) {
+		this.requiredJarFileBlobKeys = requiredJarFileBlobKeys;
+		return this;
+	}
+
+	public TestTaskBuilder setResultPartitions(Collection<ResultPartitionDeploymentDescriptor> resultPartitions) {
+		this.resultPartitions = resultPartitions;
+		return this;
+	}
+
+	public TestTaskBuilder setInputGates(Collection<InputGateDeploymentDescriptor> inputGates) {
+		this.inputGates = inputGates;
+		return this;
+	}
+
+	public Task build() throws Exception {
+		final JobID jobId = new JobID();
+		final JobVertexID jobVertexId = new JobVertexID();
+		final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+
+		final SerializedValue<ExecutionConfig> serializedExecutionConfig = new SerializedValue<>(executionConfig);
+
+		final JobInformation jobInformation = new JobInformation(
+			jobId,
+			"Test Job",
+			serializedExecutionConfig,
+			new Configuration(),
+			requiredJarFileBlobKeys,
+			Collections.emptyList());
+
+		final TaskInformation taskInformation = new TaskInformation(
+			jobVertexId,
+			"Test Task",
+			1,
+			1,
+			invokable.getName(),
+			new Configuration());
+
+		final BlobCacheService blobCacheService = new BlobCacheService(
+			mock(PermanentBlobCache.class),
+			mock(TransientBlobCache.class));
+
+		final TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+		return new Task(
+			jobInformation,
+			taskInformation,
+			executionAttemptId,
+			new AllocationID(),
+			0,
+			0,
+			resultPartitions,
+			inputGates,
+			0,
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			shuffleEnvironment,
+			kvStateService,
+			mock(BroadcastVariableManager.class),
+			new TaskEventDispatcher(),
+			new TestTaskStateManager(),
+			taskManagerActions,
+			new MockInputSplitProvider(),
+			new TestCheckpointResponder(),
+			new TestGlobalAggregateManager(),
+			blobCacheService,
+			libraryCacheManager,
+			mock(FileCache.class),
+			new TestingTaskManagerRuntimeInfo(taskManagerConfig),
+			taskMetricGroup,
+			consumableNotifier,
+			partitionProducerStateChecker,
+			executor);
+	}
+
+	public static void setTaskState(Task task, ExecutionState state) {
+		try {
+			Field f = Task.class.getDeclaredField("executionState");
+			f.setAccessible(true);
+			f.set(task, state);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Modifying the task state failed", e);
+		}
+	}
+}


[flink] 03/04: [hotfix][tests] Reduce some mocking in TestTaskBuilder

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 816fd523c3bf08e11ba26c8c7231f79e7c8288da
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Jul 26 19:16:18 2019 +0200

    [hotfix][tests] Reduce some mocking in TestTaskBuilder
---
 .../flink/runtime/taskmanager/TestTaskBuilder.java | 49 +++++++---------------
 1 file changed, 15 insertions(+), 34 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
index 97cff7b..ec1204f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.ContextClassLoaderLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
@@ -49,6 +50,7 @@ import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.taskexecutor.KvStateService;
+import org.apache.flink.runtime.taskexecutor.NoOpPartitionProducerStateChecker;
 import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
 import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -61,48 +63,27 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.Executor;
 
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Util that helps building {@link Task} objects for testing.
  */
 public final class TestTaskBuilder {
-	private Class<? extends AbstractInvokable> invokable;
-	private TaskManagerActions taskManagerActions;
-	private LibraryCacheManager libraryCacheManager;
-	private ResultPartitionConsumableNotifier consumableNotifier;
-	private PartitionProducerStateChecker partitionProducerStateChecker;
+
+	private Class<? extends AbstractInvokable> invokable = AbstractInvokable.class;
+	private TaskManagerActions taskManagerActions = new NoOpTaskManagerActions();
+	private LibraryCacheManager libraryCacheManager = ContextClassLoaderLibraryCacheManager.INSTANCE;
+	private ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
+	private PartitionProducerStateChecker partitionProducerStateChecker = new NoOpPartitionProducerStateChecker();
 	private final ShuffleEnvironment<?, ?> shuffleEnvironment;
-	private KvStateService kvStateService;
-	private Executor executor;
-	private Configuration taskManagerConfig;
-	private ExecutionConfig executionConfig;
-	private Collection<PermanentBlobKey> requiredJarFileBlobKeys;
+	private KvStateService kvStateService = new KvStateService(new KvStateRegistry(), null, null);
+	private Executor executor = TestingUtils.defaultExecutor();
+	private Configuration taskManagerConfig = new Configuration();
+	private ExecutionConfig executionConfig = new ExecutionConfig();
+	private Collection<PermanentBlobKey> requiredJarFileBlobKeys = Collections.emptyList();
 	private Collection<ResultPartitionDeploymentDescriptor> resultPartitions = Collections.emptyList();
 	private Collection<InputGateDeploymentDescriptor> inputGates = Collections.emptyList();
 
-	{
-		invokable = AbstractInvokable.class;
-		taskManagerActions = mock(TaskManagerActions.class);
-
-		libraryCacheManager = mock(LibraryCacheManager.class);
-		when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
-
-		consumableNotifier = new NoOpResultPartitionConsumableNotifier();
-		partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
-
-		kvStateService = new KvStateService(new KvStateRegistry(), null, null);
-
-		executor = TestingUtils.defaultExecutor();
-
-		taskManagerConfig = new Configuration();
-		executionConfig = new ExecutionConfig();
-
-		requiredJarFileBlobKeys = Collections.emptyList();
-	}
-
 	public TestTaskBuilder(ShuffleEnvironment<?, ?> shuffleEnvironment) {
 		this.shuffleEnvironment = Preconditions.checkNotNull(shuffleEnvironment);
 	}
@@ -206,11 +187,11 @@ public final class TestTaskBuilder {
 			resultPartitions,
 			inputGates,
 			0,
-			mock(MemoryManager.class),
+			new MemoryManager(1024 * 1024, 1),
 			mock(IOManager.class),
 			shuffleEnvironment,
 			kvStateService,
-			mock(BroadcastVariableManager.class),
+			new BroadcastVariableManager(),
 			new TaskEventDispatcher(),
 			new TestTaskStateManager(),
 			taskManagerActions,