You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/07/24 11:11:19 UTC
[flink] branch master updated: [FLINK-13055][runtime] Leverage
PartitionTracker for checking partition availability
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4fb4b63 [FLINK-13055][runtime] Leverage PartitionTracker for checking partition availability
4fb4b63 is described below
commit 4fb4b6318411fabf7d6167f02ad834e85ad66845
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Wed Jul 24 19:11:08 2019 +0800
[FLINK-13055][runtime] Leverage PartitionTracker for checking partition availability
---
.../runtime/executiongraph/ExecutionGraph.java | 13 +++-
...ionGraphResultPartitionAvailabilityChecker.java | 53 ++++++++++++++++
.../AdaptedRestartPipelinedRegionStrategyNG.java | 2 +-
.../flip1/ResultPartitionAvailabilityChecker.java | 2 +-
.../io/network/partition/PartitionTracker.java | 5 ++
.../io/network/partition/PartitionTrackerImpl.java | 7 +++
...inedRegionStrategyNGConcurrentFailoverTest.java | 44 +++++++-------
...startPipelinedRegionStrategyNGFailoverTest.java | 10 +++
...raphResultPartitionAvailabilityCheckerTest.java | 71 ++++++++++++++++++++++
.../io/network/partition/NoOpPartitionTracker.java | 5 ++
.../network/partition/TestingPartitionTracker.java | 10 +++
11 files changed, 196 insertions(+), 26 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9ed7b0b..0c85b52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.NotReleasingPartitionReleaseStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback;
@@ -297,6 +298,8 @@ public class ExecutionGraph implements AccessExecutionGraph {
private final PartitionTracker partitionTracker;
+ private final ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker;
+
/**
* Future for an ongoing or completed scheduling action.
*/
@@ -513,6 +516,10 @@ public class ExecutionGraph implements AccessExecutionGraph {
this.partitionTracker = checkNotNull(partitionTracker);
+ this.resultPartitionAvailabilityChecker = new ExecutionGraphResultPartitionAvailabilityChecker(
+ this::createResultPartitionId,
+ partitionTracker);
+
LOG.info("Job recovers via failover strategy: {}", failoverStrategy.getStrategyName());
}
@@ -1567,7 +1574,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
}
}
- private ResultPartitionID createResultPartitionId(final IntermediateResultPartitionID resultPartitionId) {
+ ResultPartitionID createResultPartitionId(final IntermediateResultPartitionID resultPartitionId) {
final SchedulingResultPartition schedulingResultPartition = schedulingTopology.getResultPartitionOrThrow(resultPartitionId);
final SchedulingExecutionVertex producer = schedulingResultPartition.getProducer();
final ExecutionVertexID producerId = producer.getId();
@@ -1743,6 +1750,10 @@ public class ExecutionGraph implements AccessExecutionGraph {
return partitionTracker;
}
+ public ResultPartitionAvailabilityChecker getResultPartitionAvailabilityChecker() {
+ return resultPartitionAvailabilityChecker;
+ }
+
PartitionReleaseStrategy getPartitionReleaseStrategy() {
return partitionReleaseStrategy;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphResultPartitionAvailabilityChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphResultPartitionAvailabilityChecker.java
new file mode 100644
index 0000000..2ec0a34
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphResultPartitionAvailabilityChecker.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ResultPartitionAvailabilityChecker} which decides the intermediate result partition availability
+ * based on whether the corresponding result partition in the execution graph is tracked.
+ */
+public class ExecutionGraphResultPartitionAvailabilityChecker implements ResultPartitionAvailabilityChecker {
+
+ /** The function maps an IntermediateResultPartitionID to a ResultPartitionID. */
+ private final Function<IntermediateResultPartitionID, ResultPartitionID> partitionIDMapper;
+
+ /** The tracker that tracks all available result partitions. */
+ private final PartitionTracker partitionTracker;
+
+ ExecutionGraphResultPartitionAvailabilityChecker(
+ final Function<IntermediateResultPartitionID, ResultPartitionID> partitionIDMapper,
+ final PartitionTracker partitionTracker) {
+
+ this.partitionIDMapper = checkNotNull(partitionIDMapper);
+ this.partitionTracker = checkNotNull(partitionTracker);
+ }
+
+ @Override
+ public boolean isAvailable(final IntermediateResultPartitionID resultPartitionID) {
+ return partitionTracker.isPartitionTracked(partitionIDMapper.apply(resultPartitionID));
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
index 337534f..ddb60af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
@@ -290,7 +290,7 @@ public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
// currently it's safe to add it here, as this method is invoked only once in production code.
checkState(restartPipelinedRegionStrategy == null, "notifyNewVertices() must be called only once");
this.restartPipelinedRegionStrategy = new RestartPipelinedRegionStrategy(
- new DefaultFailoverTopology(executionGraph));
+ new DefaultFailoverTopology(executionGraph), executionGraph.getResultPartitionAvailabilityChecker());
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ResultPartitionAvailabilityChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ResultPartitionAvailabilityChecker.java
index 286ad3e..b10724d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ResultPartitionAvailabilityChecker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ResultPartitionAvailabilityChecker.java
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
/**
* This checker helps to query result partition availability.
*/
-interface ResultPartitionAvailabilityChecker {
+public interface ResultPartitionAvailabilityChecker {
/**
* Returns whether the given partition is available.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
index 4c432f1..3697fb8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
@@ -54,4 +54,9 @@ public interface PartitionTracker {
* Returns whether any partition is being tracked for the given task executor ID.
*/
boolean isTrackingPartitionsFor(ResourceID producingTaskExecutorId);
+
+ /**
+ * Returns whether the given partition is being tracked.
+ */
+ boolean isPartitionTracked(ResultPartitionID resultPartitionID);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
index 53e7d3f..c52e8b1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
@@ -122,6 +122,13 @@ public class PartitionTrackerImpl implements PartitionTracker {
return partitionTable.hasTrackedPartitions(producingTaskExecutorId);
}
+ @Override
+ public boolean isPartitionTracked(final ResultPartitionID resultPartitionID) {
+ Preconditions.checkNotNull(resultPartitionID);
+
+ return partitionInfos.containsKey(resultPartitionID);
+ }
+
private Optional<PartitionInfo> internalStopTrackingPartition(ResultPartitionID resultPartitionId) {
final PartitionInfo partitionInfo = partitionInfos.remove(resultPartitionId);
if (partitionInfo == null) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
index f5461f0..b23ec26 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
@@ -19,8 +19,6 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
@@ -29,12 +27,14 @@ import org.apache.flink.runtime.executiongraph.AdaptedRestartPipelinedRegionStra
import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.PartitionTrackerImpl;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
@@ -42,6 +42,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Iterator;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import static org.junit.Assert.assertEquals;
@@ -250,34 +251,31 @@ public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest exten
*/
private ExecutionGraph createExecutionGraph() throws Exception {
- final JobInformation jobInformation = new DummyJobInformation(TEST_JOB_ID, "test job");
- final SimpleSlotProvider slotProvider = new SimpleSlotProvider(TEST_JOB_ID, DEFAULT_PARALLELISM);
-
- final Time timeout = Time.seconds(10L);
- final ExecutionGraph graph = new ExecutionGraph(
- jobInformation,
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- timeout,
- manuallyTriggeredRestartStrategy,
- TestAdaptedRestartPipelinedRegionStrategyNG::new,
- slotProvider,
- getClass().getClassLoader(),
- VoidBlobWriter.getInstance(),
- timeout);
-
- JobVertex v1 = new JobVertex("vertex1");
+ final JobVertex v1 = new JobVertex("vertex1");
v1.setInvokableClass(NoOpInvokable.class);
v1.setParallelism(DEFAULT_PARALLELISM);
- JobVertex v2 = new JobVertex("vertex2");
+ final JobVertex v2 = new JobVertex("vertex2");
v2.setInvokableClass(NoOpInvokable.class);
v2.setParallelism(DEFAULT_PARALLELISM);
v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
- JobGraph jg = new JobGraph(TEST_JOB_ID, "testjob", v1, v2);
- graph.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
+ final JobGraph jg = new JobGraph(TEST_JOB_ID, "testjob", v1, v2);
+
+ final SimpleSlotProvider slotProvider = new SimpleSlotProvider(TEST_JOB_ID, DEFAULT_PARALLELISM);
+
+ final PartitionTracker partitionTracker = new PartitionTrackerImpl(
+ jg.getJobID(),
+ NettyShuffleMaster.INSTANCE,
+ ignored -> Optional.empty());
+
+ final ExecutionGraph graph = new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(jg)
+ .setRestartStrategy(manuallyTriggeredRestartStrategy)
+ .setFailoverStrategyFactory(TestAdaptedRestartPipelinedRegionStrategyNG::new)
+ .setSlotProvider(slotProvider)
+ .setPartitionTracker(partitionTracker)
+ .build();
graph.start(componentMainThreadExecutor);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
index 3c1ca2b..acf9869 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
@@ -33,6 +33,8 @@ import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.PartitionTrackerImpl;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
@@ -47,6 +49,7 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.util.TestLogger;
import org.junit.Before;
@@ -57,6 +60,7 @@ import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
@@ -379,10 +383,16 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
final JobGraph jobGraph,
final RestartStrategy restartStrategy) throws Exception {
+ final PartitionTracker partitionTracker = new PartitionTrackerImpl(
+ jobGraph.getJobID(),
+ NettyShuffleMaster.INSTANCE,
+ ignored -> Optional.empty());
+
final ExecutionGraph eg = new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(jobGraph)
.setRestartStrategy(restartStrategy)
.setFailoverStrategyFactory(TestAdaptedRestartPipelinedRegionStrategyNG::new)
.setSlotProvider(slotProvider)
+ .setPartitionTracker(partitionTracker)
.build();
eg.start(componentMainThreadExecutor);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphResultPartitionAvailabilityCheckerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphResultPartitionAvailabilityCheckerTest.java
new file mode 100644
index 0000000..ccbd439
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphResultPartitionAvailabilityCheckerTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link ExecutionGraphResultPartitionAvailabilityChecker}.
+ */
+public class ExecutionGraphResultPartitionAvailabilityCheckerTest extends TestLogger {
+
+ @Test
+ public void testPartitionAvailabilityCheck() {
+
+ final IntermediateResultPartitionID irp1ID = new IntermediateResultPartitionID();
+ final IntermediateResultPartitionID irp2ID = new IntermediateResultPartitionID();
+ final IntermediateResultPartitionID irp3ID = new IntermediateResultPartitionID();
+ final IntermediateResultPartitionID irp4ID = new IntermediateResultPartitionID();
+
+ final Map<IntermediateResultPartitionID, Boolean> expectedAvailability =
+ new HashMap<IntermediateResultPartitionID, Boolean>() {{
+ put(irp1ID, true);
+ put(irp2ID, false);
+ put(irp3ID, false);
+ put(irp4ID, true);
+ }};
+
+ // let the partition tracker respect the expected availability result
+ final TestingPartitionTracker partitionTracker = new TestingPartitionTracker();
+ partitionTracker.setIsPartitionTrackedFunction(rpID -> expectedAvailability.get(rpID.getPartitionId()));
+
+ // the execution attempt ID should make no difference in this case
+ final Function<IntermediateResultPartitionID, ResultPartitionID> partitionIDMapper =
+ intermediateResultPartitionID -> new ResultPartitionID(intermediateResultPartitionID, new ExecutionAttemptID());
+
+ final ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker =
+ new ExecutionGraphResultPartitionAvailabilityChecker(partitionIDMapper, partitionTracker);
+
+ for (IntermediateResultPartitionID irpID : expectedAvailability.keySet()) {
+ assertEquals(expectedAvailability.get(irpID), resultPartitionAvailabilityChecker.isAvailable(irpID));
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
index 30895b2..0d1a160 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
@@ -50,4 +50,9 @@ public enum NoOpPartitionTracker implements PartitionTracker {
public boolean isTrackingPartitionsFor(ResourceID producingTaskExecutorId) {
return false;
}
+
+ @Override
+ public boolean isPartitionTracked(final ResultPartitionID resultPartitionID) {
+ return false;
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
index 2bcda7e..2d85ff6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
@@ -31,6 +31,7 @@ import java.util.function.Function;
public class TestingPartitionTracker implements PartitionTracker {
private Function<ResourceID, Boolean> isTrackingPartitionsForFunction = ignored -> false;
+ private Function<ResultPartitionID, Boolean> isPartitionTrackedFunction = ignored -> false;
private Consumer<ResourceID> stopTrackingAllPartitionsConsumer = ignored -> {};
private Consumer<ResourceID> stopTrackingAndReleaseAllPartitionsConsumer = ignored -> {};
private BiConsumer<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingPartitionsConsumer = (ignoredA, ignoredB) -> {};
@@ -44,6 +45,10 @@ public class TestingPartitionTracker implements PartitionTracker {
this.isTrackingPartitionsForFunction = isTrackingPartitionsForFunction;
}
+ public void setIsPartitionTrackedFunction(Function<ResultPartitionID, Boolean> isPartitionTrackedFunction) {
+ this.isPartitionTrackedFunction = isPartitionTrackedFunction;
+ }
+
public void setStopTrackingAllPartitionsConsumer(Consumer<ResourceID> stopTrackingAllPartitionsConsumer) {
this.stopTrackingAllPartitionsConsumer = stopTrackingAllPartitionsConsumer;
}
@@ -80,4 +85,9 @@ public class TestingPartitionTracker implements PartitionTracker {
public boolean isTrackingPartitionsFor(ResourceID producingTaskExecutorId) {
return isTrackingPartitionsForFunction.apply(producingTaskExecutorId);
}
+
+ @Override
+ public boolean isPartitionTracked(final ResultPartitionID resultPartitionID) {
+ return isPartitionTrackedFunction.apply(resultPartitionID);
+ }
}