You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/08 13:29:51 UTC
[flink] 01/02: [hotfix][tests] Avoid mock NetworkEnvironment in
tests
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f392616f404a6471d99d64eeab5dd1dd3da1a4a0
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Thu Apr 25 17:05:21 2019 +0800
[hotfix][tests] Avoid mock NetworkEnvironment in tests
---
.../runtime/taskexecutor/TaskExecutorTest.java | 15 +--
.../runtime/taskmanager/TaskAsyncCallTest.java | 17 +++-
.../apache/flink/runtime/taskmanager/TaskTest.java | 107 +++++++++++----------
.../runtime/util/JvmExitOnFatalErrorTest.java | 3 +-
.../tasks/InterruptSensitiveRestoreTest.java | 3 +-
.../runtime/tasks/StreamTaskTerminationTest.java | 3 +-
.../streaming/runtime/tasks/StreamTaskTest.java | 4 +-
.../runtime/tasks/SynchronousCheckpointITCase.java | 6 +-
.../tasks/TaskCheckpointingBehaviourTest.java | 3 +-
9 files changed, 87 insertions(+), 74 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index b47ad53..bf57e2a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -197,6 +197,8 @@ public class TaskExecutorTest extends TestLogger {
private SettableLeaderRetrievalService jobManagerLeaderRetriever;
+ private NetworkEnvironment networkEnvironment;
+
@Before
public void setup() throws IOException {
rpc = new TestingRpcService();
@@ -220,6 +222,8 @@ public class TaskExecutorTest extends TestLogger {
jobManagerLeaderRetriever = new SettableLeaderRetrievalService();
haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetriever);
+
+ networkEnvironment = new NetworkEnvironmentBuilder().build();
}
@After
@@ -239,6 +243,10 @@ public class TaskExecutorTest extends TestLogger {
dummyBlobCacheService = null;
}
+ if (networkEnvironment != null) {
+ networkEnvironment.shutdown();
+ }
+
testingFatalErrorHandler.rethrowError();
}
@@ -262,7 +270,6 @@ public class TaskExecutorTest extends TestLogger {
MemoryType.HEAP,
false);
- final NetworkEnvironment networkEnvironment = new NetworkEnvironmentBuilder().build();
networkEnvironment.start();
final KvStateService kvStateService = new KvStateService(new KvStateRegistry(), null, null);
@@ -699,8 +706,6 @@ public class TaskExecutorTest extends TestLogger {
when(taskSlotTable.tryMarkSlotActive(eq(jobId), eq(allocationId))).thenReturn(true);
when(taskSlotTable.addTask(any(Task.class))).thenReturn(true);
- final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
-
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
@@ -958,13 +963,11 @@ public class TaskExecutorTest extends TestLogger {
rpc.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway);
rpc.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);
- final NetworkEnvironment networkMock = mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS);
-
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
- .setNetworkEnvironment(networkMock)
+ .setNetworkEnvironment(networkEnvironment)
.setTaskSlotTable(taskSlotTable)
.setJobLeaderService(jobLeaderService)
.setJobManagerTable(jobManagerTable)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 9e06da0..1f743e4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -41,11 +41,11 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -60,6 +60,7 @@ import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
+import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@@ -105,6 +106,8 @@ public class TaskAsyncCallTest extends TestLogger {
private static final List<ClassLoader> classLoaders = Collections.synchronizedList(new ArrayList<>());
+ private NetworkEnvironment networkEnvironment;
+
@Before
public void createQueuesAndActors() {
numCalls = 1000;
@@ -114,9 +117,17 @@ public class TaskAsyncCallTest extends TestLogger {
notifyCheckpointCompleteLatch = new OneShotLatch();
stopLatch = new OneShotLatch();
+ networkEnvironment = new NetworkEnvironmentBuilder().build();
+
classLoaders.clear();
}
+ @After
+ public void teardown() {
+ if (networkEnvironment != null) {
+ networkEnvironment.shutdown();
+ }
+ }
// ------------------------------------------------------------------------
// Tests
@@ -211,13 +222,9 @@ public class TaskAsyncCallTest extends TestLogger {
LibraryCacheManager libCache = mock(LibraryCacheManager.class);
when(libCache.getClassLoader(any(JobID.class))).thenReturn(new TestUserCodeClassLoader());
- ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
Executor executor = mock(Executor.class);
- NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
- when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager);
-
TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
JobInformation jobInformation = new JobInformation(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 6a4ac80..4c878db 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -45,12 +45,12 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -67,10 +67,12 @@ import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.WrappingRuntimeException;
+import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
@@ -99,7 +101,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -114,6 +115,8 @@ public class TaskTest extends TestLogger {
private static OneShotLatch awaitLatch;
private static OneShotLatch triggerLatch;
+ private NetworkEnvironment networkEnvironment;
+
@ClassRule
public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
@@ -121,12 +124,21 @@ public class TaskTest extends TestLogger {
public void setup() {
awaitLatch = new OneShotLatch();
triggerLatch = new OneShotLatch();
+
+ networkEnvironment = new NetworkEnvironmentBuilder().build();
+ }
+
+ @After
+ public void teardown() {
+ if (networkEnvironment != null) {
+ networkEnvironment.shutdown();
+ }
}
@Test
public void testRegularExecution() throws Exception {
final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
- final Task task = new TaskBuilder()
+ final Task task = createTaskBuilder()
.setTaskManagerActions(taskManagerActions)
.build();
@@ -151,7 +163,7 @@ public class TaskTest extends TestLogger {
@Test
public void testCancelRightAway() throws Exception {
- final Task task = new TaskBuilder().build();
+ final Task task = createTaskBuilder().build();
task.cancelExecution();
assertEquals(ExecutionState.CANCELING, task.getExecutionState());
@@ -166,7 +178,7 @@ public class TaskTest extends TestLogger {
@Test
public void testFailExternallyRightAway() throws Exception {
- final Task task = new TaskBuilder().build();
+ final Task task = createTaskBuilder().build();
task.failExternally(new Exception("fail externally"));
assertEquals(ExecutionState.FAILED, task.getExecutionState());
@@ -180,7 +192,7 @@ public class TaskTest extends TestLogger {
@Test
public void testLibraryCacheRegistrationFailed() throws Exception {
final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
- final Task task = new TaskBuilder()
+ final Task task = createTaskBuilder()
.setTaskManagerActions(taskManagerActions)
.setLibraryCacheManager(mock(LibraryCacheManager.class)) // inactive manager
.build();
@@ -226,7 +238,7 @@ public class TaskTest extends TestLogger {
FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
new String[0]);
- final Task task = new TaskBuilder()
+ final Task task = createTaskBuilder()
.setRequiredJarFileBlobKeys(Collections.singletonList(missingKey))
.setLibraryCacheManager(libraryCacheManager)
.build();
@@ -251,39 +263,35 @@ public class TaskTest extends TestLogger {
@Test
public void testExecutionFailsInNetworkRegistration() throws Exception {
- // mock a network manager that rejects registration
- final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
final ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
final PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
- final NetworkEnvironment network = mock(NetworkEnvironment.class);
- when(network.getResultPartitionManager()).thenReturn(partitionManager);
- doThrow(new RuntimeException("buffers")).when(network).registerTask(any(Task.class));
-
final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
- final Task task = new TaskBuilder()
+ final Task task = new TaskBuilder(networkEnvironment)
.setTaskManagerActions(taskManagerActions)
.setConsumableNotifier(consumableNotifier)
.setPartitionProducerStateChecker(partitionProducerStateChecker)
- .setNetworkEnvironment(network)
.build();
+ // shut down the network to make the following task registration failure
+ networkEnvironment.shutdown();
+
// should fail
task.run();
// verify final state
assertEquals(ExecutionState.FAILED, task.getExecutionState());
assertTrue(task.isCanceledOrFailed());
- assertTrue(task.getFailureCause().getMessage().contains("buffers"));
+ assertTrue(task.getFailureCause().getMessage().contains("NetworkEnvironment is shut down"));
taskManagerActions.validateListenerMessage(
- ExecutionState.FAILED, task, new RuntimeException("buffers"));
+ ExecutionState.FAILED, task, new IllegalStateException("NetworkEnvironment is shut down"));
}
@Test
public void testInvokableInstantiationFailed() throws Exception {
final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
- final Task task = new TaskBuilder()
+ final Task task = createTaskBuilder()
.setTaskManagerActions(taskManagerActions)
.setInvokable(InvokableNonInstantiable.class)
.build();
@@ -303,7 +311,7 @@ public class TaskTest extends TestLogger {
@Test
public void testExecutionFailsInInvoke() throws Exception {
final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
- final Task task = new TaskBuilder()
+ final Task task = createTaskBuilder()
.setInvokable(InvokableWithExceptionInInvoke.class)
.setTaskManagerActions(taskManagerActions)
.build();
@@ -323,7 +331,7 @@ public class TaskTest extends TestLogger {
@Test
public void testFailWithWrappedException() throws Exception {
final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
- final Task task = new TaskBuilder()
+ final Task task = createTaskBuilder()
.setInvokable(FailingInvokableWithChainedException.class)
.setTaskManagerActions(taskManagerActions)
.build();
@@ -343,7 +351,7 @@ public class TaskTest extends TestLogger {
@Test
public void testCancelDuringInvoke() throws Exception {
final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
- final Task task = new TaskBuilder()
+ final Task task = createTaskBuilder()
.setInvokable(InvokableBlockingInInvoke.class)
.setTaskManagerActions(taskManagerActions)
.build();
@@ -371,7 +379,7 @@ public class TaskTest extends TestLogger {
@Test
public void testFailExternallyDuringInvoke() throws Exception {
final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
- final Task task = new TaskBuilder()
+ final Task task = createTaskBuilder()
.setInvokable(InvokableBlockingInInvoke.class)
.setTaskManagerActions(taskManagerActions)
.build();
@@ -397,7 +405,7 @@ public class TaskTest extends TestLogger {
@Test
public void testCanceledAfterExecutionFailedInInvoke() throws Exception {
final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
- final Task task = new TaskBuilder()
+ final Task task = createTaskBuilder()
.setInvokable(InvokableWithExceptionInInvoke.class)
.setTaskManagerActions(taskManagerActions)
.build();
@@ -418,7 +426,7 @@ public class TaskTest extends TestLogger {
@Test
public void testExecutionFailsAfterCanceling() throws Exception {
final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
- final Task task = new TaskBuilder()
+ final Task task = createTaskBuilder()
.setInvokable(InvokableWithExceptionOnTrigger.class)
.setTaskManagerActions(taskManagerActions)
.build();
@@ -449,7 +457,7 @@ public class TaskTest extends TestLogger {
@Test
public void testExecutionFailsAfterTaskMarkedFailed() throws Exception {
final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
- final Task task = new TaskBuilder()
+ final Task task = createTaskBuilder()
.setInvokable(InvokableWithExceptionOnTrigger.class)
.setTaskManagerActions(taskManagerActions)
.build();
@@ -478,7 +486,7 @@ public class TaskTest extends TestLogger {
@Test
public void testCancelTaskException() throws Exception {
- final Task task = new TaskBuilder()
+ final Task task = createTaskBuilder()
.setInvokable(InvokableWithCancelTaskExceptionInInvoke.class)
.build();
@@ -492,7 +500,7 @@ public class TaskTest extends TestLogger {
@Test
public void testCancelTaskExceptionAfterTaskMarkedFailed() throws Exception {
- final Task task = new TaskBuilder()
+ final Task task = createTaskBuilder()
.setInvokable(InvokableWithCancelTaskExceptionInInvoke.class)
.build();
@@ -523,7 +531,7 @@ public class TaskTest extends TestLogger {
final SingleInputGate inputGate = mock(SingleInputGate.class);
when(inputGate.getConsumedResultId()).thenReturn(resultId);
- final Task task = new TaskBuilder()
+ final Task task = createTaskBuilder()
.setInvokable(InvokableBlockingInInvoke.class)
.build();
@@ -571,8 +579,6 @@ public class TaskTest extends TestLogger {
final PartitionProducerStateChecker partitionChecker = mock(PartitionProducerStateChecker.class);
final ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
- final NetworkEnvironment network = mock(NetworkEnvironment.class);
- when(network.getResultPartitionManager()).thenReturn(mock(ResultPartitionManager.class));
// Test all branches of trigger partition state check
{
@@ -580,9 +586,8 @@ public class TaskTest extends TestLogger {
setup();
// PartitionProducerDisposedException
- final Task task = new TaskBuilder()
+ final Task task = createTaskBuilder()
.setInvokable(InvokableBlockingInInvoke.class)
- .setNetworkEnvironment(network)
.setConsumableNotifier(consumableNotifier)
.setPartitionProducerStateChecker(partitionChecker)
.setExecutor(Executors.directExecutor())
@@ -602,9 +607,8 @@ public class TaskTest extends TestLogger {
setup();
// Any other exception
- final Task task = new TaskBuilder()
+ final Task task = createTaskBuilder()
.setInvokable(InvokableBlockingInInvoke.class)
- .setNetworkEnvironment(network)
.setConsumableNotifier(consumableNotifier)
.setPartitionProducerStateChecker(partitionChecker)
.setExecutor(Executors.directExecutor())
@@ -626,9 +630,8 @@ public class TaskTest extends TestLogger {
// TimeoutException handled special => retry
// Any other exception
- final Task task = new TaskBuilder()
+ final Task task = createTaskBuilder()
.setInvokable(InvokableBlockingInInvoke.class)
- .setNetworkEnvironment(network)
.setConsumableNotifier(consumableNotifier)
.setPartitionProducerStateChecker(partitionChecker)
.setExecutor(Executors.directExecutor())
@@ -664,9 +667,8 @@ public class TaskTest extends TestLogger {
setup();
// Success
- final Task task = new TaskBuilder()
+ final Task task = createTaskBuilder()
.setInvokable(InvokableBlockingInInvoke.class)
- .setNetworkEnvironment(network)
.setConsumableNotifier(consumableNotifier)
.setPartitionProducerStateChecker(partitionChecker)
.setExecutor(Executors.directExecutor())
@@ -711,7 +713,7 @@ public class TaskTest extends TestLogger {
config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL.key(), 5);
config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT.key(), 60 * 1000);
- final Task task = new TaskBuilder()
+ final Task task = createTaskBuilder()
.setInvokable(InvokableBlockingInCancel.class)
.setTaskManagerConfig(config)
.setTaskManagerActions(taskManagerActions)
@@ -738,7 +740,7 @@ public class TaskTest extends TestLogger {
config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, 5);
config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 50);
- final Task task = new TaskBuilder()
+ final Task task = createTaskBuilder()
.setInvokable(InvokableInterruptibleSharedLockInInvokeAndCancel.class)
.setTaskManagerConfig(config)
.setTaskManagerActions(taskManagerActions)
@@ -765,7 +767,7 @@ public class TaskTest extends TestLogger {
config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, 5);
config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 50);
- final Task task = new TaskBuilder()
+ final Task task = createTaskBuilder()
.setInvokable(InvokableUnInterruptibleBlockingInvoke.class)
.setTaskManagerConfig(config)
.setTaskManagerActions(taskManagerActions)
@@ -804,7 +806,7 @@ public class TaskTest extends TestLogger {
executionConfig.setTaskCancellationInterval(interval + 1337);
executionConfig.setTaskCancellationTimeout(timeout - 1337);
- final Task task = new TaskBuilder()
+ final Task task = createTaskBuilder()
.setInvokable(InvokableBlockingInInvoke.class)
.setTaskManagerConfig(config)
.setExecutionConfig(executionConfig)
@@ -917,13 +919,17 @@ public class TaskTest extends TestLogger {
}
}
- private final class TaskBuilder {
+ private TaskBuilder createTaskBuilder() {
+ return new TaskBuilder(networkEnvironment);
+ }
+
+ private static final class TaskBuilder {
private Class<? extends AbstractInvokable> invokable;
private TaskManagerActions taskManagerActions;
private LibraryCacheManager libraryCacheManager;
private ResultPartitionConsumableNotifier consumableNotifier;
private PartitionProducerStateChecker partitionProducerStateChecker;
- private NetworkEnvironment networkEnvironment;
+ private final NetworkEnvironment networkEnvironment;
private KvStateService kvStateService;
private Executor executor;
private Configuration taskManagerConfig;
@@ -940,10 +946,6 @@ public class TaskTest extends TestLogger {
consumableNotifier = new NoOpResultPartitionConsumableNotifier();
partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
- final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
- networkEnvironment = mock(NetworkEnvironment.class);
- when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager);
-
kvStateService = new KvStateService(new KvStateRegistry(), null, null);
executor = TestingUtils.defaultExecutor();
@@ -954,6 +956,10 @@ public class TaskTest extends TestLogger {
requiredJarFileBlobKeys = Collections.emptyList();
}
+ private TaskBuilder(NetworkEnvironment networkEnvironment) {
+ this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment);
+ }
+
TaskBuilder setInvokable(Class<? extends AbstractInvokable> invokable) {
this.invokable = invokable;
return this;
@@ -979,11 +985,6 @@ public class TaskTest extends TestLogger {
return this;
}
- TaskBuilder setNetworkEnvironment(NetworkEnvironment networkEnvironment) {
- this.networkEnvironment = networkEnvironment;
- return this;
- }
-
TaskBuilder setKvStateService(KvStateService kvStateService) {
this.kvStateService = kvStateService;
return this;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index cff1a26..1b2ec49 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
@@ -167,7 +168,7 @@ public class JvmExitOnFatalErrorTest {
final MemoryManager memoryManager = new MemoryManager(1024 * 1024, 1);
final IOManager ioManager = new IOManagerAsync();
- final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
+ final NetworkEnvironment networkEnvironment = new NetworkEnvironmentBuilder().build();
final TaskManagerRuntimeInfo tmInfo = TaskManagerConfiguration.fromConfiguration(taskManagerConfig);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 9e1e793..24b1047 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
@@ -180,7 +181,7 @@ public class InterruptSensitiveRestoreTest {
StreamStateHandle state,
int mode) throws IOException {
- NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
+ NetworkEnvironment networkEnvironment = new NetworkEnvironmentBuilder().build();
Collection<KeyedStateHandle> keyedStateFromBackend = Collections.emptyList();
Collection<KeyedStateHandle> keyedStateFromStream = Collections.emptyList();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 566e740..8918b0a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
@@ -149,7 +150,7 @@ public class StreamTaskTerminationTest extends TestLogger {
final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();
- final NetworkEnvironment networkEnv = mock(NetworkEnvironment.class);
+ final NetworkEnvironment networkEnv = new NetworkEnvironmentBuilder().build();
BlobCacheService blobService =
new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class));
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 2fe91e0..e24949d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
@@ -899,8 +900,7 @@ public class StreamTaskTest extends TestLogger {
PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
Executor executor = mock(Executor.class);
- NetworkEnvironment network = mock(NetworkEnvironment.class);
- when(network.getResultPartitionManager()).thenReturn(partitionManager);
+ NetworkEnvironment network = new NetworkEnvironmentBuilder().build();
JobInformation jobInformation = new JobInformation(
new JobID(),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
index 32b3392..222133a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
@@ -43,11 +43,11 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -259,12 +259,10 @@ public class SynchronousCheckpointITCase {
LibraryCacheManager libCache = mock(LibraryCacheManager.class);
when(libCache.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
- ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
Executor executor = mock(Executor.class);
- NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
- when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager);
+ NetworkEnvironment networkEnvironment = new NetworkEnvironmentBuilder().build();
TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index 6733858..244c8aa 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
@@ -223,7 +224,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
TestStreamTask.class.getName(),
taskConfig);
- NetworkEnvironment network = mock(NetworkEnvironment.class);
+ NetworkEnvironment network = new NetworkEnvironmentBuilder().build();
BlobCacheService blobService =
new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class));