You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/05/28 15:46:41 UTC
[flink] 05/07: [FLINK-17558][tests] Simplify partition tracker setup
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
commit cce971167d4de726dcdb7c9a6e4f0f648fb61266
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed May 20 14:11:09 2020 +0200
[FLINK-17558][tests] Simplify partition tracker setup
---
.../TaskExecutorPartitionLifecycleTest.java | 43 ++++++++--------------
1 file changed, 16 insertions(+), 27 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index b8f43d0..4f6ee44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -206,14 +206,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
@Test
public void testPartitionReleaseAfterJobMasterDisconnect() throws Exception {
+ final CompletableFuture<JobID> releasePartitionsForJobFuture = new CompletableFuture<>();
testPartitionRelease(
- partitionTracker -> {
- final CompletableFuture<JobID> releasePartitionsForJobFuture = new CompletableFuture<>();
- partitionTracker.setStopTrackingAndReleaseAllPartitionsConsumer(releasePartitionsForJobFuture::complete);
- return releasePartitionsForJobFuture;
- },
- (jobId, partitionId, taskExecutor, taskExecutorGateway, releasePartitionsForJobFuture) -> {
-
+ partitionTracker -> partitionTracker.setStopTrackingAndReleaseAllPartitionsConsumer(releasePartitionsForJobFuture::complete),
+ (jobId, partitionId, taskExecutor, taskExecutorGateway) -> {
taskExecutorGateway.disconnectJobManager(jobId, new Exception("test"));
assertThat(releasePartitionsForJobFuture.get(), equalTo(jobId));
@@ -223,13 +219,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
@Test
public void testPartitionReleaseAfterReleaseCall() throws Exception {
+ final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>();
testPartitionRelease(
- partitionTracker -> {
- final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>();
- partitionTracker.setStopTrackingAndReleasePartitionsConsumer(releasePartitionsFuture::complete);
- return releasePartitionsFuture;
- },
- (jobId, partitionId, taskExecutor, taskExecutorGateway, releasePartitionsFuture) -> {
+ partitionTracker -> partitionTracker.setStopTrackingAndReleasePartitionsConsumer(releasePartitionsFuture::complete),
+ (jobId, partitionId, taskExecutor, taskExecutorGateway) -> {
taskExecutorGateway.releaseOrPromotePartitions(jobId, Collections.singleton(partitionId), Collections.emptySet());
assertThat(releasePartitionsFuture.get(), hasItems(partitionId));
@@ -239,13 +232,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
@Test
public void testPartitionPromotion() throws Exception {
+ final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>();
testPartitionRelease(
- partitionTracker -> {
- final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>();
- partitionTracker.setPromotePartitionsConsumer(releasePartitionsFuture::complete);
- return releasePartitionsFuture;
- },
- (jobId, partitionId, taskExecutor, taskExecutorGateway, releasePartitionsFuture) -> {
+ partitionTracker -> partitionTracker.setPromotePartitionsConsumer(releasePartitionsFuture::complete),
+ (jobId, partitionId, taskExecutor, taskExecutorGateway) -> {
taskExecutorGateway.releaseOrPromotePartitions(jobId, Collections.emptySet(), Collections.singleton(partitionId));
assertThat(releasePartitionsFuture.get(), hasItems(partitionId));
@@ -253,7 +243,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
);
}
- private <C> void testPartitionRelease(PartitionTrackerSetup<C> partitionTrackerSetup, TestAction<C> testAction) throws Exception {
+ private void testPartitionRelease(PartitionTrackerSetup partitionTrackerSetup, TestAction testAction) throws Exception {
final ResultPartitionDeploymentDescriptor taskResultPartitionDescriptor =
PartitionTestUtils.createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING);
@@ -314,7 +304,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
final TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker();
final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>();
partitionTracker.setStartTrackingPartitionsConsumer((jobId, partitionInfo) -> startTrackingFuture.complete(partitionInfo.getResultPartitionId()));
- C partitionTrackerSetupResult = partitionTrackerSetup.accept(partitionTracker);
+ partitionTrackerSetup.accept(partitionTracker);
final TestingTaskExecutor taskExecutor = createTestingTaskExecutor(taskManagerServices, partitionTracker);
@@ -394,8 +384,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
jobId,
taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID(),
taskExecutor,
- taskExecutorGateway,
- partitionTrackerSetupResult);
+ taskExecutorGateway);
} finally {
RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
}
@@ -447,12 +436,12 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
}
@FunctionalInterface
- private interface PartitionTrackerSetup<C> {
- C accept(TestingTaskExecutorPartitionTracker partitionTracker) throws Exception;
+ private interface PartitionTrackerSetup {
+ void accept(TestingTaskExecutorPartitionTracker partitionTracker) throws Exception;
}
@FunctionalInterface
- private interface TestAction<C> {
- void accept(JobID jobId, ResultPartitionID resultPartitionId, TaskExecutor taskExecutor, TaskExecutorGateway taskExecutorGateway, C partitionTrackerSetupResult) throws Exception;
+ private interface TestAction {
+ void accept(JobID jobId, ResultPartitionID resultPartitionId, TaskExecutor taskExecutor, TaskExecutorGateway taskExecutorGateway) throws Exception;
}
}