You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/05/28 15:46:41 UTC

[flink] 05/07: [FLINK-17558][tests] Simplify partition tracker setup

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cce971167d4de726dcdb7c9a6e4f0f648fb61266
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed May 20 14:11:09 2020 +0200

    [FLINK-17558][tests] Simplify partition tracker setup
---
 .../TaskExecutorPartitionLifecycleTest.java        | 43 ++++++++--------------
 1 file changed, 16 insertions(+), 27 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index b8f43d0..4f6ee44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -206,14 +206,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 
 	@Test
 	public void testPartitionReleaseAfterJobMasterDisconnect() throws Exception {
+		final CompletableFuture<JobID> releasePartitionsForJobFuture = new CompletableFuture<>();
 		testPartitionRelease(
-			partitionTracker -> {
-				final CompletableFuture<JobID> releasePartitionsForJobFuture = new CompletableFuture<>();
-				partitionTracker.setStopTrackingAndReleaseAllPartitionsConsumer(releasePartitionsForJobFuture::complete);
-				return releasePartitionsForJobFuture;
-			},
-			(jobId, partitionId, taskExecutor, taskExecutorGateway, releasePartitionsForJobFuture) -> {
-
+			partitionTracker -> partitionTracker.setStopTrackingAndReleaseAllPartitionsConsumer(releasePartitionsForJobFuture::complete),
+			(jobId, partitionId, taskExecutor, taskExecutorGateway) -> {
 				taskExecutorGateway.disconnectJobManager(jobId, new Exception("test"));
 
 				assertThat(releasePartitionsForJobFuture.get(), equalTo(jobId));
@@ -223,13 +219,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 
 	@Test
 	public void testPartitionReleaseAfterReleaseCall() throws Exception {
+		final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>();
 		testPartitionRelease(
-			partitionTracker -> {
-				final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>();
-				partitionTracker.setStopTrackingAndReleasePartitionsConsumer(releasePartitionsFuture::complete);
-				return releasePartitionsFuture;
-			},
-			(jobId, partitionId, taskExecutor, taskExecutorGateway, releasePartitionsFuture) -> {
+			partitionTracker -> partitionTracker.setStopTrackingAndReleasePartitionsConsumer(releasePartitionsFuture::complete),
+			(jobId, partitionId, taskExecutor, taskExecutorGateway) -> {
 				taskExecutorGateway.releaseOrPromotePartitions(jobId, Collections.singleton(partitionId), Collections.emptySet());
 
 				assertThat(releasePartitionsFuture.get(), hasItems(partitionId));
@@ -239,13 +232,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 
 	@Test
 	public void testPartitionPromotion() throws Exception {
+		final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>();
 		testPartitionRelease(
-			partitionTracker -> {
-				final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>();
-				partitionTracker.setPromotePartitionsConsumer(releasePartitionsFuture::complete);
-				return releasePartitionsFuture;
-			},
-			(jobId, partitionId, taskExecutor, taskExecutorGateway, releasePartitionsFuture) -> {
+			partitionTracker -> partitionTracker.setPromotePartitionsConsumer(releasePartitionsFuture::complete),
+			(jobId, partitionId, taskExecutor, taskExecutorGateway) -> {
 				taskExecutorGateway.releaseOrPromotePartitions(jobId, Collections.emptySet(), Collections.singleton(partitionId));
 
 				assertThat(releasePartitionsFuture.get(), hasItems(partitionId));
@@ -253,7 +243,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 		);
 	}
 
-	private <C> void testPartitionRelease(PartitionTrackerSetup<C> partitionTrackerSetup, TestAction<C> testAction) throws Exception {
+	private void testPartitionRelease(PartitionTrackerSetup partitionTrackerSetup, TestAction testAction) throws Exception {
 
 		final ResultPartitionDeploymentDescriptor taskResultPartitionDescriptor =
 			PartitionTestUtils.createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING);
@@ -314,7 +304,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 		final TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker();
 		final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>();
 		partitionTracker.setStartTrackingPartitionsConsumer((jobId, partitionInfo) -> startTrackingFuture.complete(partitionInfo.getResultPartitionId()));
-		C partitionTrackerSetupResult = partitionTrackerSetup.accept(partitionTracker);
+		partitionTrackerSetup.accept(partitionTracker);
 
 		final TestingTaskExecutor taskExecutor = createTestingTaskExecutor(taskManagerServices, partitionTracker);
 
@@ -394,8 +384,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 				jobId,
 				taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID(),
 				taskExecutor,
-				taskExecutorGateway,
-				partitionTrackerSetupResult);
+				taskExecutorGateway);
 		} finally {
 			RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
 		}
@@ -447,12 +436,12 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 	}
 
 	@FunctionalInterface
-	private interface PartitionTrackerSetup<C> {
-		C accept(TestingTaskExecutorPartitionTracker partitionTracker) throws Exception;
+	private interface PartitionTrackerSetup {
+		void accept(TestingTaskExecutorPartitionTracker partitionTracker) throws Exception;
 	}
 
 	@FunctionalInterface
-	private interface TestAction<C> {
-		void accept(JobID jobId, ResultPartitionID resultPartitionId, TaskExecutor taskExecutor, TaskExecutorGateway taskExecutorGateway, C partitionTrackerSetupResult) throws Exception;
+	private interface TestAction {
+		void accept(JobID jobId, ResultPartitionID resultPartitionId, TaskExecutor taskExecutor, TaskExecutorGateway taskExecutorGateway) throws Exception;
 	}
 }