You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/05/28 15:46:42 UTC
[flink] 06/07: [FLINK-17558][tests] Extract
ShuffleEnvironment/PartitionTracker setup
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0efeea3ac5d3c04b830e87209b3e7357c8826931
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed May 20 14:13:23 2020 +0200
[FLINK-17558][tests] Extract ShuffleEnvironment/PartitionTracker setup
---
.../TaskExecutorPartitionLifecycleTest.java | 25 ++++++++++++++++------
1 file changed, 18 insertions(+), 7 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index 4f6ee44..81b651f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -244,6 +244,24 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
}
private void testPartitionRelease(PartitionTrackerSetup partitionTrackerSetup, TestAction testAction) throws Exception {
+ final TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker();
+ final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>();
+ partitionTracker.setStartTrackingPartitionsConsumer((jobId, partitionInfo) -> startTrackingFuture.complete(partitionInfo.getResultPartitionId()));
+ partitionTrackerSetup.accept(partitionTracker);
+
+ internalTestPartitionRelease(
+ partitionTracker,
+ new NettyShuffleEnvironmentBuilder().build(),
+ startTrackingFuture,
+ testAction
+ );
+ }
+
+ private void internalTestPartitionRelease(
+ TaskExecutorPartitionTracker partitionTracker,
+ ShuffleEnvironment<?, ?> shuffleEnvironment,
+ CompletableFuture<ResultPartitionID> startTrackingFuture,
+ TestAction testAction) throws Exception {
final ResultPartitionDeploymentDescriptor taskResultPartitionDescriptor =
PartitionTestUtils.createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING);
@@ -276,8 +294,6 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
new File[]{tmp.newFolder()},
Executors.directExecutor());
- final ShuffleEnvironment<?, ?> shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
-
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskSlotTable(taskSlotTable)
.setTaskStateManager(localStateStoresManager)
@@ -301,11 +317,6 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
})
.build();
- final TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker();
- final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>();
- partitionTracker.setStartTrackingPartitionsConsumer((jobId, partitionInfo) -> startTrackingFuture.complete(partitionInfo.getResultPartitionId()));
- partitionTrackerSetup.accept(partitionTracker);
-
final TestingTaskExecutor taskExecutor = createTestingTaskExecutor(taskManagerServices, partitionTracker);
final CompletableFuture<SlotReport> initialSlotReportFuture = new CompletableFuture<>();