You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/05/28 15:46:42 UTC

[flink] 06/07: [FLINK-17558][tests] Extract ShuffleEnvironment/PartitionTracker setup

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0efeea3ac5d3c04b830e87209b3e7357c8826931
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed May 20 14:13:23 2020 +0200

    [FLINK-17558][tests] Extract ShuffleEnvironment/PartitionTracker setup
---
 .../TaskExecutorPartitionLifecycleTest.java        | 25 ++++++++++++++++------
 1 file changed, 18 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index 4f6ee44..81b651f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -244,6 +244,24 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 	}
 
 	private void testPartitionRelease(PartitionTrackerSetup partitionTrackerSetup, TestAction testAction) throws Exception {
+		final TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker();
+		final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>();
+		partitionTracker.setStartTrackingPartitionsConsumer((jobId, partitionInfo) -> startTrackingFuture.complete(partitionInfo.getResultPartitionId()));
+		partitionTrackerSetup.accept(partitionTracker);
+
+		internalTestPartitionRelease(
+			partitionTracker,
+			new NettyShuffleEnvironmentBuilder().build(),
+			startTrackingFuture,
+			testAction
+		);
+	}
+
+	private void internalTestPartitionRelease(
+			TaskExecutorPartitionTracker partitionTracker,
+			ShuffleEnvironment<?, ?> shuffleEnvironment,
+			CompletableFuture<ResultPartitionID> startTrackingFuture,
+			TestAction testAction) throws Exception {
 
 		final ResultPartitionDeploymentDescriptor taskResultPartitionDescriptor =
 			PartitionTestUtils.createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING);
@@ -276,8 +294,6 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 			new File[]{tmp.newFolder()},
 			Executors.directExecutor());
 
-		final ShuffleEnvironment<?, ?> shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
-
 		final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
 			.setTaskSlotTable(taskSlotTable)
 			.setTaskStateManager(localStateStoresManager)
@@ -301,11 +317,6 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 			})
 			.build();
 
-		final TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker();
-		final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>();
-		partitionTracker.setStartTrackingPartitionsConsumer((jobId, partitionInfo) -> startTrackingFuture.complete(partitionInfo.getResultPartitionId()));
-		partitionTrackerSetup.accept(partitionTracker);
-
 		final TestingTaskExecutor taskExecutor = createTestingTaskExecutor(taskManagerServices, partitionTracker);
 
 		final CompletableFuture<SlotReport> initialSlotReportFuture = new CompletableFuture<>();