You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/08/02 07:44:12 UTC
[flink] 15/15: [FLINK-13371][coordination] Prevent leaks of
blocking partitions
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b5ab84c6238ff1f69f2151ed580410ae4c63acd7
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Aug 1 14:27:28 2019 +0200
[FLINK-13371][coordination] Prevent leaks of blocking partitions
---
.../flink/runtime/executiongraph/Execution.java | 71 +++--
.../runtime/executiongraph/ExecutionGraph.java | 2 +-
.../runtime/executiongraph/ExecutionVertex.java | 4 +-
.../io/network/partition/PartitionTracker.java | 5 +
.../io/network/partition/PartitionTrackerImpl.java | 7 +
.../flink/runtime/shuffle/ProducerDescriptor.java | 4 +-
.../ExecutionGraphDeploymentTest.java | 2 +-
.../ExecutionPartitionLifecycleTest.java | 324 +++++++++++++++++++++
.../runtime/executiongraph/ExecutionTest.java | 127 --------
.../io/network/partition/NoOpPartitionTracker.java | 4 +
.../io/network/partition/PartitionTestUtils.java | 2 +-
.../partition/PartitionTrackerImplTest.java | 7 +-
.../network/partition/TestingPartitionTracker.java | 10 +
.../TaskExecutorPartitionLifecycleTest.java | 51 +++-
14 files changed, 456 insertions(+), 164 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index af94188..d8a1b6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -56,6 +56,7 @@ import org.apache.flink.runtime.messages.StackTraceSampleResponse;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
@@ -1013,7 +1014,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
}
void markFailed(Throwable t, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
- processFail(t, true, userAccumulators, metrics);
+ // skip release of partitions since this is only called if the TM actually sent the FAILED state update
+ // in this case all partitions have already been cleaned up
+ processFail(t, true, userAccumulators, metrics, false);
}
@VisibleForTesting
@@ -1059,7 +1062,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
else if (current == CANCELING) {
// we sent a cancel call, and the task manager finished before it arrived. We
// will never get a CANCELED call back from the job manager
- completeCancelling(userAccumulators, metrics);
+ completeCancelling(userAccumulators, metrics, true);
return;
}
else if (current == CANCELED || current == FAILED) {
@@ -1096,10 +1099,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
}
void completeCancelling() {
- completeCancelling(null, null);
+ completeCancelling(null, null, true);
}
- void completeCancelling(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
+ void completeCancelling(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics, boolean releasePartitions) {
// the taskmanagers can themselves cancel tasks without an external trigger, if they find that the
// network stack is canceled (for example by a failing / canceling receiver or sender
@@ -1117,7 +1120,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
updateAccumulatorsAndMetrics(userAccumulators, metrics);
if (transitionState(current, CANCELED)) {
- finishCancellation();
+ finishCancellation(releasePartitions);
return;
}
@@ -1136,11 +1139,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
}
}
- private void finishCancellation() {
+ private void finishCancellation(boolean releasePartitions) {
releaseAssignedResource(new FlinkException("Execution " + this + " was cancelled."));
vertex.getExecutionGraph().deregisterExecution(this);
- // release partitions on TM in case the Task finished while we where already CANCELING
- stopTrackingAndReleasePartitions();
+ handlePartitionCleanup(releasePartitions, releasePartitions);
}
void cachePartitionInfo(PartitionInfo partitionInfo) {
@@ -1160,10 +1162,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
// --------------------------------------------------------------------------------------------
private boolean processFail(Throwable t, boolean isCallback) {
- return processFail(t, isCallback, null, null);
+ return processFail(t, isCallback, null, null, true);
}
- private boolean processFail(Throwable t, boolean isCallback, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
+ private boolean processFail(Throwable t, boolean isCallback, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics, boolean releasePartitions) {
// damn, we failed. This means only that we keep our books and notify our parent JobExecutionVertex
// the actual computation on the task manager is cleaned up by the TaskManager that noticed the failure
@@ -1187,7 +1189,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
}
if (current == CANCELING) {
- completeCancelling(userAccumulators, metrics);
+ completeCancelling(userAccumulators, metrics, true);
return false;
}
@@ -1199,7 +1201,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
releaseAssignedResource(t);
vertex.getExecutionGraph().deregisterExecution(this);
- stopTrackingAndReleasePartitions();
+ handlePartitionCleanup(releasePartitions, releasePartitions);
if (!isCallback && (current == RUNNING || current == DEPLOYING)) {
if (LOG.isDebugEnabled()) {
@@ -1304,16 +1306,51 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
}
}
- void stopTrackingAndReleasePartitions() {
+ void handlePartitionCleanup(boolean releasePipelinedPartitions, boolean releaseBlockingPartitions) {
+ if (releasePipelinedPartitions) {
+ sendReleaseIntermediateResultPartitionsRpcCall();
+ }
+
+ final Collection<ResultPartitionID> partitionIds = getPartitionIds();
+ final PartitionTracker partitionTracker = getVertex().getExecutionGraph().getPartitionTracker();
+
+ if (!partitionIds.isEmpty()) {
+ if (releaseBlockingPartitions) {
+ LOG.info("Discarding the results produced by task execution {}.", attemptId);
+ partitionTracker.stopTrackingAndReleasePartitions(partitionIds);
+ } else {
+ partitionTracker.stopTrackingPartitions(partitionIds);
+ }
+ }
+ }
+
+ private Collection<ResultPartitionID> getPartitionIds() {
+ return producedPartitions.values().stream()
+ .map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor)
+ .map(ShuffleDescriptor::getResultPartitionID)
+ .collect(Collectors.toList());
+ }
+
+ private void sendReleaseIntermediateResultPartitionsRpcCall() {
LOG.info("Discarding the results produced by task execution {}.", attemptId);
- if (producedPartitions != null && producedPartitions.size() > 0) {
- final PartitionTracker partitionTracker = getVertex().getExecutionGraph().getPartitionTracker();
- final List<ResultPartitionID> producedPartitionIds = producedPartitions.values().stream()
+ final LogicalSlot slot = assignedResource;
+
+ if (slot != null) {
+ final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
+
+ final ShuffleMaster<?> shuffleMaster = getVertex().getExecutionGraph().getShuffleMaster();
+
+ Collection<ResultPartitionID> partitionIds = producedPartitions.values().stream()
+ .filter(resultPartitionDeploymentDescriptor -> resultPartitionDeploymentDescriptor.getPartitionType().isPipelined())
.map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor)
+ .peek(shuffleMaster::releasePartitionExternally)
.map(ShuffleDescriptor::getResultPartitionID)
.collect(Collectors.toList());
- partitionTracker.stopTrackingAndReleasePartitions(producedPartitionIds);
+ if (!partitionIds.isEmpty()) {
+ // TODO For some tests this could be a problem when querying too early if all resources were released
+ taskManagerGateway.releasePartitions(getVertex().getJobId(), partitionIds);
+ }
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index f779fd2..0984274 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1525,7 +1525,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
case CANCELED:
// this deserialization is exception-free
accumulators = deserializeAccumulators(state);
- attempt.completeCancelling(accumulators, state.getIOMetrics());
+ attempt.completeCancelling(accumulators, state.getIOMetrics(), false);
return true;
case FAILED:
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 03c68f8..6d262ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -604,7 +604,9 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
if (oldState.isTerminal()) {
if (oldState == FINISHED) {
- oldExecution.stopTrackingAndReleasePartitions();
+ // pipelined partitions are released in Execution#cancel(), covering both job failures and vertex resets
+ // do not release pipelined partitions here to save RPC calls
+ oldExecution.handlePartitionCleanup(false, true);
getExecutionGraph().getPartitionReleaseStrategy().vertexUnfinished(executionVertexId);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
index 3697fb8..268f4f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
@@ -46,6 +46,11 @@ public interface PartitionTracker {
void stopTrackingAndReleasePartitions(Collection<ResultPartitionID> resultPartitionIds);
/**
+ * Stops the tracking of the given partitions.
+ */
+ void stopTrackingPartitions(Collection<ResultPartitionID> resultPartitionIds);
+
+ /**
* Releases all partitions for the given task executor ID, and stop the tracking of partitions that were released.
*/
void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
index f772b37..2e7f421 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
@@ -106,6 +106,13 @@ public class PartitionTrackerImpl implements PartitionTracker {
}
@Override
+ public void stopTrackingPartitions(Collection<ResultPartitionID> resultPartitionIds) {
+ Preconditions.checkNotNull(resultPartitionIds);
+
+ resultPartitionIds.forEach(this::internalStopTrackingPartition);
+ }
+
+ @Override
public void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId) {
Preconditions.checkNotNull(producingTaskExecutorId);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java
index ad3fc48..ec32178 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java
@@ -63,11 +63,11 @@ public class ProducerDescriptor {
this.dataPort = dataPort;
}
- ResourceID getProducerLocation() {
+ public ResourceID getProducerLocation() {
return producerLocation;
}
- ExecutionAttemptID getProducerExecutionId() {
+ public ExecutionAttemptID getProducerExecutionId() {
return producerExecutionId;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index d0cfa00..d10fcbd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -382,7 +382,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
Execution execution1 = executions.values().iterator().next();
execution1.cancel();
- execution1.completeCancelling(accumulators, ioMetrics);
+ execution1.completeCancelling(accumulators, ioMetrics, false);
assertEquals(ioMetrics, execution1.getIOMetrics());
assertEquals(accumulators, execution1.getUserAccumulators());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
new file mode 100644
index 0000000..ab4048d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link Execution}.
+ */
+public class ExecutionPartitionLifecycleTest extends TestLogger {
+
+ @ClassRule
+ public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE =
+ new TestingComponentMainThreadExecutor.Resource();
+
+ private Execution execution;
+ private ResultPartitionDeploymentDescriptor descriptor;
+ private ResourceID taskExecutorResourceId;
+ private JobID jobId;
+
+ @Test
+ public void testPartitionReleaseOnFinishWhileCanceling() throws Exception {
+ final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
+ final CompletableFuture<Tuple2<JobID, Collection<ResultPartitionID>>> releasePartitionsCallFuture = new CompletableFuture<>();
+ taskManagerGateway.setReleasePartitionsConsumer(((jobID, partitionIds) -> releasePartitionsCallFuture.complete(Tuple2.of(jobID, partitionIds))));
+
+ final TestingShuffleMaster testingShuffleMaster = new TestingShuffleMaster();
+
+ setupExecutionGraphAndStartRunningJob(ResultPartitionType.PIPELINED, NoOpPartitionTracker.INSTANCE, taskManagerGateway, testingShuffleMaster);
+
+ execution.cancel();
+ assertFalse(releasePartitionsCallFuture.isDone());
+
+ execution.markFinished();
+ assertTrue(releasePartitionsCallFuture.isDone());
+
+ final Tuple2<JobID, Collection<ResultPartitionID>> releasePartitionsCall = releasePartitionsCallFuture.get();
+ assertEquals(jobId, releasePartitionsCall.f0);
+ assertEquals(Collections.singletonList(descriptor.getShuffleDescriptor().getResultPartitionID()), releasePartitionsCall.f1);
+
+ assertEquals(1, testingShuffleMaster.externallyReleasedPartitions.size());
+ assertEquals(descriptor.getShuffleDescriptor(), testingShuffleMaster.externallyReleasedPartitions.poll());
+ }
+
+ private enum PartitionReleaseResult {
+ NONE,
+ STOP_TRACKING,
+ STOP_TRACKING_AND_RELEASE
+ }
+
+ @Test
+ public void testPartitionTrackedAndNotReleasedWhenFinished() throws Exception {
+ testPartitionTrackingForStateTransition(Execution::markFinished, PartitionReleaseResult.NONE);
+ }
+
+ @Test
+ public void testPartitionNotTrackedAndNotReleasedWhenCanceledByTM() throws Exception {
+ testPartitionTrackingForStateTransition(
+ execution -> {
+ execution.cancel();
+ execution.completeCancelling(Collections.emptyMap(), new IOMetrics(0, 0, 0, 0), false);
+ },
+ PartitionReleaseResult.STOP_TRACKING);
+ }
+
+ @Test
+ public void testPartitionNotTrackedAndReleasedWhenCanceledByJM() throws Exception {
+ testPartitionTrackingForStateTransition(
+ execution -> {
+ execution.cancel();
+ execution.completeCancelling();
+ },
+ PartitionReleaseResult.STOP_TRACKING_AND_RELEASE);
+ }
+
+ @Test
+ public void testPartitionNotTrackedAndNotReleasedWhenFailedByTM() throws Exception {
+ testPartitionTrackingForStateTransition(
+ execution -> execution.markFailed(
+ new Exception("Test exception"),
+ Collections.emptyMap(),
+ new IOMetrics(0, 0, 0, 0)),
+ PartitionReleaseResult.STOP_TRACKING);
+ }
+
+ @Test
+ public void testPartitionNotTrackedAndReleasedWhenFailedByJM() throws Exception {
+ testPartitionTrackingForStateTransition(
+ execution -> execution.markFailed(new Exception("Test exception")),
+ PartitionReleaseResult.STOP_TRACKING_AND_RELEASE);
+ }
+
+ private void testPartitionTrackingForStateTransition(final Consumer<Execution> stateTransition, final PartitionReleaseResult partitionReleaseResult) throws Exception {
+ CompletableFuture<Tuple2<ResourceID, ResultPartitionDeploymentDescriptor>> partitionStartTrackingFuture = new CompletableFuture<>();
+ CompletableFuture<Collection<ResultPartitionID>> partitionStopTrackingFuture = new CompletableFuture<>();
+ CompletableFuture<Collection<ResultPartitionID>> partitionStopTrackingAndReleaseFuture = new CompletableFuture<>();
+ final TestingPartitionTracker partitionTracker = new TestingPartitionTracker();
+ partitionTracker.setStartTrackingPartitionsConsumer(
+ (resourceID, resultPartitionDeploymentDescriptor) ->
+ partitionStartTrackingFuture.complete(Tuple2.of(resourceID, resultPartitionDeploymentDescriptor))
+ );
+ partitionTracker.setStopTrackingPartitionsConsumer(partitionStopTrackingFuture::complete);
+ partitionTracker.setStopTrackingAndReleasePartitionsConsumer(partitionStopTrackingAndReleaseFuture::complete);
+
+ setupExecutionGraphAndStartRunningJob(ResultPartitionType.BLOCKING, partitionTracker, new SimpleAckingTaskManagerGateway(), NettyShuffleMaster.INSTANCE);
+
+ Tuple2<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingCall = partitionStartTrackingFuture.get();
+ assertThat(startTrackingCall.f0, equalTo(taskExecutorResourceId));
+ assertThat(startTrackingCall.f1, equalTo(descriptor));
+
+ stateTransition.accept(execution);
+
+ switch (partitionReleaseResult) {
+ case NONE:
+ assertFalse(partitionStopTrackingFuture.isDone());
+ assertFalse(partitionStopTrackingAndReleaseFuture.isDone());
+ break;
+ case STOP_TRACKING:
+ assertTrue(partitionStopTrackingFuture.isDone());
+ assertFalse(partitionStopTrackingAndReleaseFuture.isDone());
+ final Collection<ResultPartitionID> stopTrackingCall = partitionStopTrackingFuture.get();
+ assertEquals(Collections.singletonList(descriptor.getShuffleDescriptor().getResultPartitionID()), stopTrackingCall);
+ break;
+ case STOP_TRACKING_AND_RELEASE:
+ assertFalse(partitionStopTrackingFuture.isDone());
+ assertTrue(partitionStopTrackingAndReleaseFuture.isDone());
+ final Collection<ResultPartitionID> stopTrackingAndReleaseCall = partitionStopTrackingAndReleaseFuture.get();
+ assertEquals(Collections.singletonList(descriptor.getShuffleDescriptor().getResultPartitionID()), stopTrackingAndReleaseCall);
+ break;
+ }
+ }
+
+ private void setupExecutionGraphAndStartRunningJob(ResultPartitionType resultPartitionType, PartitionTracker partitionTracker, TaskManagerGateway taskManagerGateway, ShuffleMaster<?> shuffleMaster) throws JobException, JobExecutionException {
+ final JobVertex producerVertex = createNoOpJobVertex();
+ final JobVertex consumerVertex = createNoOpJobVertex();
+ consumerVertex.connectNewDataSetAsInput(producerVertex, DistributionPattern.ALL_TO_ALL, resultPartitionType);
+
+ final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+
+ final SlotProvider slotProvider = new SlotProvider() {
+ @Override
+ public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) {
+ return CompletableFuture.completedFuture(new SimpleSlot(
+ new SingleSlotTestingSlotOwner(),
+ taskManagerLocation,
+ 0,
+ taskManagerGateway));
+ }
+
+ @Override
+ public void cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
+ }
+ };
+
+ final ExecutionGraph executionGraph = ExecutionGraphBuilder.buildGraph(
+ null,
+ new JobGraph(new JobID(), "test job", producerVertex, consumerVertex),
+ new Configuration(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ slotProvider,
+ ExecutionPartitionLifecycleTest.class.getClassLoader(),
+ new StandaloneCheckpointRecoveryFactory(),
+ Time.seconds(10),
+ new NoRestartStrategy(),
+ new UnregisteredMetricsGroup(),
+ VoidBlobWriter.getInstance(),
+ Time.seconds(10),
+ log,
+ shuffleMaster,
+ partitionTracker);
+
+ executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+ final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(producerVertex.getID());
+ final ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
+ execution = executionVertex.getCurrentExecutionAttempt();
+
+ execution.allocateResourcesForExecution(
+ executionGraph.getSlotProviderStrategy(),
+ LocationPreferenceConstraint.ALL,
+ Collections.emptySet());
+
+ execution.deploy();
+ execution.switchToRunning();
+
+ final IntermediateResultPartitionID expectedIntermediateResultPartitionId = executionJobVertex
+ .getProducedDataSets()[0]
+ .getPartitions()[0]
+ .getPartitionId();
+
+ descriptor = execution
+ .getResultPartitionDeploymentDescriptor(expectedIntermediateResultPartitionId).get();
+ taskExecutorResourceId = taskManagerLocation.getResourceID();
+ jobId = executionGraph.getJobID();
+ }
+
+ @Nonnull
+ private JobVertex createNoOpJobVertex() {
+ final JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID());
+ jobVertex.setInvokableClass(NoOpInvokable.class);
+
+ return jobVertex;
+ }
+
+ /**
+ * Slot owner which records the first returned slot.
+ */
+ private static final class SingleSlotTestingSlotOwner implements SlotOwner {
+
+ final CompletableFuture<LogicalSlot> returnedSlot = new CompletableFuture<>();
+
+ @Override
+ public void returnLogicalSlot(LogicalSlot logicalSlot) {
+ returnedSlot.complete(logicalSlot);
+ }
+ }
+
+ private static class TestingShuffleMaster implements ShuffleMaster<ShuffleDescriptor> {
+
+ final Queue<ShuffleDescriptor> externallyReleasedPartitions = new ArrayBlockingQueue<>(4);
+
+ @Override
+ public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
+ return CompletableFuture.completedFuture(new ShuffleDescriptor() {
+ @Override
+ public ResultPartitionID getResultPartitionID() {
+ return new ResultPartitionID(
+ partitionDescriptor.getPartitionId(),
+ producerDescriptor.getProducerExecutionId());
+ }
+
+ @Override
+ public Optional<ResourceID> storesLocalResourcesOn() {
+ return Optional.of(producerDescriptor.getProducerLocation());
+ }
+ });
+ }
+
+ @Override
+ public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
+ externallyReleasedPartitions.add(shuffleDescriptor);
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index b8c83fe..433e3f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -19,33 +19,16 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotOwner;
@@ -53,10 +36,8 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
-import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
@@ -65,7 +46,6 @@ import org.junit.ClassRule;
import org.junit.Test;
import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collection;
@@ -74,9 +54,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
-import java.util.function.Consumer;
-import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
@@ -526,111 +504,6 @@ public class ExecutionTest extends TestLogger {
assertThat(returnedSlotFuture.get(), is(equalTo(slotRequestIdFuture.get())));
}
- @Test
- public void testPartitionRetainedWhenFinished() throws Exception {
- testPartitionTrackingForStateTransition(Execution::markFinished, false);
- }
-
- @Test
- public void testPartitionReleasedWhenCanceled() throws Exception {
- testPartitionTrackingForStateTransition(
- execution -> {
- execution.cancel();
- execution.completeCancelling();
- },
- true);
- }
-
- @Test
- public void testPartitionReleasedWhenFailed() throws Exception {
- testPartitionTrackingForStateTransition(execution -> execution.fail(new Exception("Test exception")), true);
- }
-
- private void testPartitionTrackingForStateTransition(final Consumer<Execution> stateTransition, final boolean shouldPartitionBeReleased) throws Exception {
- final JobVertex producerVertex = createNoOpJobVertex();
- final JobVertex consumerVertex = createNoOpJobVertex();
- consumerVertex.connectNewDataSetAsInput(producerVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
-
- final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
-
- final SlotProvider slotProvider = new SlotProvider() {
- @Override
- public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) {
- return CompletableFuture.completedFuture(new SimpleSlot(
- new SingleSlotTestingSlotOwner(),
- taskManagerLocation,
- 0,
- new SimpleAckingTaskManagerGateway()));
- }
-
- @Override
- public void cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
- }
- };
-
- CompletableFuture<Tuple2<ResourceID, ResultPartitionDeploymentDescriptor>> partitionStartTrackingFuture = new CompletableFuture<>();
- CompletableFuture<Collection<ResultPartitionID>> partitionReleaseFuture = new CompletableFuture<>();
- final TestingPartitionTracker partitionTracker = new TestingPartitionTracker();
- partitionTracker.setStartTrackingPartitionsConsumer(
- (resourceID, resultPartitionDeploymentDescriptor) ->
- partitionStartTrackingFuture.complete(Tuple2.of(resourceID, resultPartitionDeploymentDescriptor))
- );
- partitionTracker.setStopTrackingAndReleasePartitionsConsumer(partitionReleaseFuture::complete);
-
- final ExecutionGraph executionGraph = ExecutionGraphBuilder.buildGraph(
- null,
- new JobGraph(new JobID(), "test job", producerVertex, consumerVertex),
- new Configuration(),
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- slotProvider,
- ExecutionTest.class.getClassLoader(),
- new StandaloneCheckpointRecoveryFactory(),
- Time.seconds(10),
- new NoRestartStrategy(),
- new UnregisteredMetricsGroup(),
- VoidBlobWriter.getInstance(),
- Time.seconds(10),
- log,
- NettyShuffleMaster.INSTANCE,
- partitionTracker);
-
- executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
-
- final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(producerVertex.getID());
- final ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
-
- final Execution execution = executionVertex.getCurrentExecutionAttempt();
-
- execution.allocateResourcesForExecution(
- executionGraph.getSlotProviderStrategy(),
- LocationPreferenceConstraint.ALL,
- Collections.emptySet());
-
- assertThat(partitionStartTrackingFuture.isDone(), is(true));
- final Tuple2<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingCall = partitionStartTrackingFuture.get();
-
- final IntermediateResultPartitionID expectedIntermediateResultPartitionId = executionJobVertex
- .getProducedDataSets()[0]
- .getPartitions()[0]
- .getPartitionId();
- final ResultPartitionDeploymentDescriptor descriptor = execution
- .getResultPartitionDeploymentDescriptor(expectedIntermediateResultPartitionId).get();
- assertThat(startTrackingCall.f0, equalTo(taskManagerLocation.getResourceID()));
- assertThat(startTrackingCall.f1, equalTo(descriptor));
-
- execution.deploy();
- execution.switchToRunning();
-
- stateTransition.accept(execution);
-
- assertThat(partitionReleaseFuture.isDone(), is(shouldPartitionBeReleased));
- if (shouldPartitionBeReleased) {
- final Collection<ResultPartitionID> partitionReleaseCall = partitionReleaseFuture.get();
- assertThat(partitionReleaseCall, contains(descriptor.getShuffleDescriptor().getResultPartitionID()));
- }
- }
-
/**
* Tests that a slot release will atomically release the assigned {@link Execution}.
*/
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
index 0d1a160..2888b31 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
@@ -43,6 +43,10 @@ public enum NoOpPartitionTracker implements PartitionTracker {
}
@Override
+ public void stopTrackingPartitions(Collection<ResultPartitionID> resultPartitionIds) {
+ }
+
+ @Override
public void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId) {
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index 72892d6..b1a58a0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -96,7 +96,7 @@ public enum PartitionTestUtils {
}
}
- static ResultPartitionDeploymentDescriptor createPartitionDeploymentDescriptor(
+ public static ResultPartitionDeploymentDescriptor createPartitionDeploymentDescriptor(
ResultPartitionType partitionType) {
ShuffleDescriptor shuffleDescriptor = NettyShuffleDescriptorBuilder.newBuilder().buildLocal();
PartitionDescriptor partitionDescriptor = new PartitionDescriptor(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
index 5ca7156..63e4f44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
@@ -50,12 +50,12 @@ import static org.junit.Assert.assertEquals;
public class PartitionTrackerImplTest extends TestLogger {
@Test
- public void testReleasedOnConsumptionPartitionIsNotTracked() {
+ public void testPipelinedPartitionIsNotTracked() {
testReleaseOnConsumptionHandling(ResultPartitionType.PIPELINED);
}
@Test
- public void testRetainedOnConsumptionPartitionIsTracked() {
+ public void testBlockingPartitionIsTracked() {
testReleaseOnConsumptionHandling(ResultPartitionType.BLOCKING);
}
@@ -75,8 +75,7 @@ public class PartitionTrackerImplTest extends TestLogger {
resultPartitionType,
false));
- final boolean isTrackingExpected = resultPartitionType == ResultPartitionType.BLOCKING;
- assertThat(partitionTracker.isTrackingPartitionsFor(resourceId), is(isTrackingExpected));
+ assertThat(partitionTracker.isTrackingPartitionsFor(resourceId), is(resultPartitionType.isBlocking()));
}
@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
index 2d85ff6..6ba333d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
@@ -36,6 +36,7 @@ public class TestingPartitionTracker implements PartitionTracker {
private Consumer<ResourceID> stopTrackingAndReleaseAllPartitionsConsumer = ignored -> {};
private BiConsumer<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingPartitionsConsumer = (ignoredA, ignoredB) -> {};
private Consumer<Collection<ResultPartitionID>> stopTrackingAndReleasePartitionsConsumer = ignored -> {};
+ private Consumer<Collection<ResultPartitionID>> stopTrackingPartitionsConsumer = ignored -> {};
public void setStartTrackingPartitionsConsumer(BiConsumer<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingPartitionsConsumer) {
this.startTrackingPartitionsConsumer = startTrackingPartitionsConsumer;
@@ -61,6 +62,10 @@ public class TestingPartitionTracker implements PartitionTracker {
this.stopTrackingAndReleasePartitionsConsumer = stopTrackingAndReleasePartitionsConsumer;
}
+ public void setStopTrackingPartitionsConsumer(Consumer<Collection<ResultPartitionID>> stopTrackingPartitionsConsumer) {
+ this.stopTrackingPartitionsConsumer = stopTrackingPartitionsConsumer;
+ }
+
@Override
public void startTrackingPartition(ResourceID producingTaskExecutorId, ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) {
this.startTrackingPartitionsConsumer.accept(producingTaskExecutorId, resultPartitionDeploymentDescriptor);
@@ -77,6 +82,11 @@ public class TestingPartitionTracker implements PartitionTracker {
}
@Override
+ public void stopTrackingPartitions(Collection<ResultPartitionID> resultPartitionIds) {
+ stopTrackingPartitionsConsumer.accept(resultPartitionIds);
+ }
+
+ @Override
public void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId) {
stopTrackingAndReleaseAllPartitionsConsumer.accept(producingTaskExecutorId);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index d87cf84..6b6b9e4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvi
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
@@ -90,6 +91,7 @@ import java.util.stream.StreamSupport;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertTrue;
/**
@@ -190,33 +192,62 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
}
@Test
- public void testPartitionReleaseAfterDisconnect() throws Exception {
+ public void testBlockingPartitionReleaseAfterDisconnect() throws Exception {
testPartitionRelease(
(jobId, partitionId, taskExecutorGateway) -> taskExecutorGateway.disconnectJobManager(jobId, new Exception("test")),
- true);
+ true,
+ ResultPartitionType.BLOCKING);
}
@Test
- public void testPartitionReleaseAfterReleaseCall() throws Exception {
+ public void testPipelinedPartitionNotReleasedAfterDisconnect() throws Exception {
+ testPartitionRelease(
+ (jobId, partitionId, taskExecutorGateway) -> taskExecutorGateway.disconnectJobManager(jobId, new Exception("test")),
+ false,
+ ResultPartitionType.PIPELINED);
+ }
+
+ @Test
+ public void testBlockingPartitionReleaseAfterReleaseCall() throws Exception {
testPartitionRelease(
(jobId, partitionId, taskExecutorGateway) -> taskExecutorGateway.releasePartitions(jobId, Collections.singletonList(partitionId)),
- true);
+ true,
+ ResultPartitionType.BLOCKING);
}
@Test
- public void testPartitionReleaseAfterShutdown() throws Exception {
+ public void testPipelinedPartitionReleaseAfterReleaseCall() throws Exception {
+ testPartitionRelease(
+ (jobId, partitionId, taskExecutorGateway) -> taskExecutorGateway.releasePartitions(jobId, Collections.singletonList(partitionId)),
+ true,
+ ResultPartitionType.PIPELINED);
+ }
+
+ @Test
+ public void testBlockingPartitionReleaseAfterShutdown() throws Exception {
// don't do any explicit release action, so that the partition must be cleaned up on shutdown
testPartitionRelease(
(jobId, partitionId, taskExecutorGateway) -> { },
- false);
+ false,
+ ResultPartitionType.BLOCKING);
+ }
+
+ @Test
+ public void testPipelinedPartitionReleaseAfterShutdown() throws Exception {
+ // don't do any explicit release action, so that the partition must be cleaned up on shutdown
+ testPartitionRelease(
+ (jobId, partitionId, taskExecutorGateway) -> { },
+ false,
+ ResultPartitionType.PIPELINED);
}
private void testPartitionRelease(
TriConsumer<JobID, ResultPartitionID, TaskExecutorGateway> releaseAction,
- boolean waitForRelease) throws Exception {
+ boolean waitForRelease,
+ ResultPartitionType resultPartitionType) throws Exception {
final ResultPartitionDeploymentDescriptor taskResultPartitionDescriptor =
- PartitionTestUtils.createPartitionDeploymentDescriptor();
+ PartitionTestUtils.createPartitionDeploymentDescriptor(resultPartitionType);
final ExecutionAttemptID eid1 = taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID().getProducerId();
final TaskDeploymentDescriptor taskDeploymentDescriptor =
@@ -343,7 +374,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
// the task is still running => the partition is in in-progress
runInTaskExecutorThreadAndWait(
taskExecutor,
- () -> assertTrue(partitionTable.hasTrackedPartitions(jobId)));
+ () -> assertThat(partitionTable.hasTrackedPartitions(jobId), is(resultPartitionType.isBlocking())));
TestingInvokable.sync.releaseBlocker();
taskFinishedFuture.get(timeout.getSize(), timeout.getUnit());
@@ -351,7 +382,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
// the task is finished => the partition should be finished now
runInTaskExecutorThreadAndWait(
taskExecutor,
- () -> assertTrue(partitionTable.hasTrackedPartitions(jobId)));
+ () -> assertThat(partitionTable.hasTrackedPartitions(jobId), is(resultPartitionType.isBlocking())));
final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>();
runInTaskExecutorThreadAndWait(