You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/07/24 11:11:38 UTC

[flink] branch release-1.9 updated: [FLINK-13055][runtime] Leverage PartitionTracker for checking partition availability

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new febaee9  [FLINK-13055][runtime] Leverage PartitionTracker for checking partition availability
febaee9 is described below

commit febaee9238e2a1da5feea845a869b837017a2eac
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Wed Jul 24 19:11:08 2019 +0800

    [FLINK-13055][runtime] Leverage PartitionTracker for checking partition availability
---
 .../runtime/executiongraph/ExecutionGraph.java     | 13 +++-
 ...ionGraphResultPartitionAvailabilityChecker.java | 53 ++++++++++++++++
 .../AdaptedRestartPipelinedRegionStrategyNG.java   |  2 +-
 .../flip1/ResultPartitionAvailabilityChecker.java  |  2 +-
 .../io/network/partition/PartitionTracker.java     |  5 ++
 .../io/network/partition/PartitionTrackerImpl.java |  7 +++
 ...inedRegionStrategyNGConcurrentFailoverTest.java | 44 +++++++-------
 ...startPipelinedRegionStrategyNGFailoverTest.java | 10 +++
 ...raphResultPartitionAvailabilityCheckerTest.java | 71 ++++++++++++++++++++++
 .../io/network/partition/NoOpPartitionTracker.java |  5 ++
 .../network/partition/TestingPartitionTracker.java | 10 +++
 11 files changed, 196 insertions(+), 26 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9ed7b0b..0c85b52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
 import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.NotReleasingPartitionReleaseStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
 import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback;
@@ -297,6 +298,8 @@ public class ExecutionGraph implements AccessExecutionGraph {
 
 	private final PartitionTracker partitionTracker;
 
+	private final ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker;
+
 	/**
 	 * Future for an ongoing or completed scheduling action.
 	 */
@@ -513,6 +516,10 @@ public class ExecutionGraph implements AccessExecutionGraph {
 
 		this.partitionTracker = checkNotNull(partitionTracker);
 
+		this.resultPartitionAvailabilityChecker = new ExecutionGraphResultPartitionAvailabilityChecker(
+			this::createResultPartitionId,
+			partitionTracker);
+
 		LOG.info("Job recovers via failover strategy: {}", failoverStrategy.getStrategyName());
 	}
 
@@ -1567,7 +1574,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
 		}
 	}
 
-	private ResultPartitionID createResultPartitionId(final IntermediateResultPartitionID resultPartitionId) {
+	ResultPartitionID createResultPartitionId(final IntermediateResultPartitionID resultPartitionId) {
 		final SchedulingResultPartition schedulingResultPartition = schedulingTopology.getResultPartitionOrThrow(resultPartitionId);
 		final SchedulingExecutionVertex producer = schedulingResultPartition.getProducer();
 		final ExecutionVertexID producerId = producer.getId();
@@ -1743,6 +1750,10 @@ public class ExecutionGraph implements AccessExecutionGraph {
 		return partitionTracker;
 	}
 
+	public ResultPartitionAvailabilityChecker getResultPartitionAvailabilityChecker() {
+		return resultPartitionAvailabilityChecker;
+	}
+
 	PartitionReleaseStrategy getPartitionReleaseStrategy() {
 		return partitionReleaseStrategy;
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphResultPartitionAvailabilityChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphResultPartitionAvailabilityChecker.java
new file mode 100644
index 0000000..2ec0a34
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphResultPartitionAvailabilityChecker.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ResultPartitionAvailabilityChecker} which decides the intermediate result partition availability
+ * based on whether the corresponding result partition in the execution graph is tracked.
+ */
+public class ExecutionGraphResultPartitionAvailabilityChecker implements ResultPartitionAvailabilityChecker {
+
+	/** The function maps an IntermediateResultPartitionID to a ResultPartitionID. */
+	private final Function<IntermediateResultPartitionID, ResultPartitionID> partitionIDMapper;
+
+	/** The tracker that tracks all available result partitions. */
+	private final PartitionTracker partitionTracker;
+
+	ExecutionGraphResultPartitionAvailabilityChecker(
+			final Function<IntermediateResultPartitionID, ResultPartitionID> partitionIDMapper,
+			final PartitionTracker partitionTracker) {
+
+		this.partitionIDMapper = checkNotNull(partitionIDMapper);
+		this.partitionTracker = checkNotNull(partitionTracker);
+	}
+
+	@Override
+	public boolean isAvailable(final IntermediateResultPartitionID resultPartitionID) {
+		return partitionTracker.isPartitionTracked(partitionIDMapper.apply(resultPartitionID));
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
index 337534f..ddb60af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
@@ -290,7 +290,7 @@ public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
 		// currently it's safe to add it here, as this method is invoked only once in production code.
 		checkState(restartPipelinedRegionStrategy == null, "notifyNewVertices() must be called only once");
 		this.restartPipelinedRegionStrategy = new RestartPipelinedRegionStrategy(
-			new DefaultFailoverTopology(executionGraph));
+			new DefaultFailoverTopology(executionGraph), executionGraph.getResultPartitionAvailabilityChecker());
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ResultPartitionAvailabilityChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ResultPartitionAvailabilityChecker.java
index 286ad3e..b10724d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ResultPartitionAvailabilityChecker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ResultPartitionAvailabilityChecker.java
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 /**
  * This checker helps to query result partition availability.
  */
-interface ResultPartitionAvailabilityChecker {
+public interface ResultPartitionAvailabilityChecker {
 
 	/**
 	 * Returns whether the given partition is available.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
index 4c432f1..3697fb8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
@@ -54,4 +54,9 @@ public interface PartitionTracker {
 	 * Returns whether any partition is being tracked for the given task executor ID.
 	 */
 	boolean isTrackingPartitionsFor(ResourceID producingTaskExecutorId);
+
+	/**
+	 * Returns whether the given partition is being tracked.
+	 */
+	boolean isPartitionTracked(ResultPartitionID resultPartitionID);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
index 53e7d3f..c52e8b1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
@@ -122,6 +122,13 @@ public class PartitionTrackerImpl implements PartitionTracker {
 		return partitionTable.hasTrackedPartitions(producingTaskExecutorId);
 	}
 
+	@Override
+	public boolean isPartitionTracked(final ResultPartitionID resultPartitionID) {
+		Preconditions.checkNotNull(resultPartitionID);
+
+		return partitionInfos.containsKey(resultPartitionID);
+	}
+
 	private Optional<PartitionInfo> internalStopTrackingPartition(ResultPartitionID resultPartitionId) {
 		final PartitionInfo partitionInfo = partitionInfos.remove(resultPartitionId);
 		if (partitionInfo == null) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
index f5461f0..b23ec26 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
@@ -19,8 +19,6 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
@@ -29,12 +27,14 @@ import org.apache.flink.runtime.executiongraph.AdaptedRestartPipelinedRegionStra
 import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.PartitionTrackerImpl;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.TestLogger;
 
@@ -42,6 +42,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Iterator;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import static org.junit.Assert.assertEquals;
@@ -250,34 +251,31 @@ public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest exten
 	 */
 	private ExecutionGraph createExecutionGraph() throws Exception {
 
-		final JobInformation jobInformation = new DummyJobInformation(TEST_JOB_ID, "test job");
-		final SimpleSlotProvider slotProvider = new SimpleSlotProvider(TEST_JOB_ID, DEFAULT_PARALLELISM);
-
-		final Time timeout = Time.seconds(10L);
-		final ExecutionGraph graph = new ExecutionGraph(
-			jobInformation,
-			TestingUtils.defaultExecutor(),
-			TestingUtils.defaultExecutor(),
-			timeout,
-			manuallyTriggeredRestartStrategy,
-			TestAdaptedRestartPipelinedRegionStrategyNG::new,
-			slotProvider,
-			getClass().getClassLoader(),
-			VoidBlobWriter.getInstance(),
-			timeout);
-
-		JobVertex v1 = new JobVertex("vertex1");
+		final JobVertex v1 = new JobVertex("vertex1");
 		v1.setInvokableClass(NoOpInvokable.class);
 		v1.setParallelism(DEFAULT_PARALLELISM);
 
-		JobVertex v2 = new JobVertex("vertex2");
+		final JobVertex v2 = new JobVertex("vertex2");
 		v2.setInvokableClass(NoOpInvokable.class);
 		v2.setParallelism(DEFAULT_PARALLELISM);
 
 		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
 
-		JobGraph jg = new JobGraph(TEST_JOB_ID, "testjob", v1, v2);
-		graph.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
+		final JobGraph jg = new JobGraph(TEST_JOB_ID, "testjob", v1, v2);
+
+		final SimpleSlotProvider slotProvider = new SimpleSlotProvider(TEST_JOB_ID, DEFAULT_PARALLELISM);
+
+		final PartitionTracker partitionTracker = new PartitionTrackerImpl(
+			jg.getJobID(),
+			NettyShuffleMaster.INSTANCE,
+			ignored -> Optional.empty());
+
+		final ExecutionGraph graph = new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(jg)
+			.setRestartStrategy(manuallyTriggeredRestartStrategy)
+			.setFailoverStrategyFactory(TestAdaptedRestartPipelinedRegionStrategyNG::new)
+			.setSlotProvider(slotProvider)
+			.setPartitionTracker(partitionTracker)
+			.build();
 
 		graph.start(componentMainThreadExecutor);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
index 3c1ca2b..acf9869 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
@@ -33,6 +33,8 @@ import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.PartitionTrackerImpl;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
@@ -47,6 +49,7 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
@@ -57,6 +60,7 @@ import javax.annotation.Nullable;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
@@ -379,10 +383,16 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
 			final JobGraph jobGraph,
 			final RestartStrategy restartStrategy) throws Exception {
 
+		final PartitionTracker partitionTracker = new PartitionTrackerImpl(
+			jobGraph.getJobID(),
+			NettyShuffleMaster.INSTANCE,
+			ignored -> Optional.empty());
+
 		final ExecutionGraph eg = new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(jobGraph)
 			.setRestartStrategy(restartStrategy)
 			.setFailoverStrategyFactory(TestAdaptedRestartPipelinedRegionStrategyNG::new)
 			.setSlotProvider(slotProvider)
+			.setPartitionTracker(partitionTracker)
 			.build();
 
 		eg.start(componentMainThreadExecutor);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphResultPartitionAvailabilityCheckerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphResultPartitionAvailabilityCheckerTest.java
new file mode 100644
index 0000000..ccbd439
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphResultPartitionAvailabilityCheckerTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link ExecutionGraphResultPartitionAvailabilityChecker}.
+ */
+public class ExecutionGraphResultPartitionAvailabilityCheckerTest extends TestLogger {
+
+	@Test
+	public void testPartitionAvailabilityCheck() {
+
+		final IntermediateResultPartitionID irp1ID = new IntermediateResultPartitionID();
+		final IntermediateResultPartitionID irp2ID = new IntermediateResultPartitionID();
+		final IntermediateResultPartitionID irp3ID = new IntermediateResultPartitionID();
+		final IntermediateResultPartitionID irp4ID = new IntermediateResultPartitionID();
+
+		final Map<IntermediateResultPartitionID, Boolean> expectedAvailability =
+			new HashMap<IntermediateResultPartitionID, Boolean>() {{
+				put(irp1ID, true);
+				put(irp2ID, false);
+				put(irp3ID, false);
+				put(irp4ID, true);
+			}};
+
+		// let the partition tracker respect the expected availability result
+		final TestingPartitionTracker partitionTracker = new TestingPartitionTracker();
+		partitionTracker.setIsPartitionTrackedFunction(rpID -> expectedAvailability.get(rpID.getPartitionId()));
+
+		// the execution attempt ID should make no difference in this case
+		final Function<IntermediateResultPartitionID, ResultPartitionID> partitionIDMapper =
+			intermediateResultPartitionID -> new ResultPartitionID(intermediateResultPartitionID, new ExecutionAttemptID());
+
+		final ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker =
+			new ExecutionGraphResultPartitionAvailabilityChecker(partitionIDMapper, partitionTracker);
+
+		for (IntermediateResultPartitionID irpID : expectedAvailability.keySet()) {
+			assertEquals(expectedAvailability.get(irpID), resultPartitionAvailabilityChecker.isAvailable(irpID));
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
index 30895b2..0d1a160 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
@@ -50,4 +50,9 @@ public enum NoOpPartitionTracker implements PartitionTracker {
 	public boolean isTrackingPartitionsFor(ResourceID producingTaskExecutorId) {
 		return false;
 	}
+
+	@Override
+	public boolean isPartitionTracked(final ResultPartitionID resultPartitionID) {
+		return false;
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
index 2bcda7e..2d85ff6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
@@ -31,6 +31,7 @@ import java.util.function.Function;
 public class TestingPartitionTracker implements PartitionTracker {
 
 	private Function<ResourceID, Boolean> isTrackingPartitionsForFunction = ignored -> false;
+	private Function<ResultPartitionID, Boolean> isPartitionTrackedFunction = ignored -> false;
 	private Consumer<ResourceID> stopTrackingAllPartitionsConsumer = ignored -> {};
 	private Consumer<ResourceID> stopTrackingAndReleaseAllPartitionsConsumer = ignored -> {};
 	private BiConsumer<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingPartitionsConsumer = (ignoredA, ignoredB) -> {};
@@ -44,6 +45,10 @@ public class TestingPartitionTracker implements PartitionTracker {
 		this.isTrackingPartitionsForFunction = isTrackingPartitionsForFunction;
 	}
 
+	public void setIsPartitionTrackedFunction(Function<ResultPartitionID, Boolean> isPartitionTrackedFunction) {
+		this.isPartitionTrackedFunction = isPartitionTrackedFunction;
+	}
+
 	public void setStopTrackingAllPartitionsConsumer(Consumer<ResourceID> stopTrackingAllPartitionsConsumer) {
 		this.stopTrackingAllPartitionsConsumer = stopTrackingAllPartitionsConsumer;
 	}
@@ -80,4 +85,9 @@ public class TestingPartitionTracker implements PartitionTracker {
 	public boolean isTrackingPartitionsFor(ResourceID producingTaskExecutorId) {
 		return isTrackingPartitionsForFunction.apply(producingTaskExecutorId);
 	}
+
+	@Override
+	public boolean isPartitionTracked(final ResultPartitionID resultPartitionID) {
+		return isPartitionTrackedFunction.apply(resultPartitionID);
+	}
 }