You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/08/02 07:39:20 UTC
[flink] 01/02: [hotfix][coordination] Check whether partition set
to track is empty
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit fc72a7c95b4d745e2df64bd75857e77f4d5ca14a
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Jul 26 12:49:45 2019 +0200
[hotfix][coordination] Check whether partition set to track is empty
---
.../flink/runtime/taskexecutor/partition/PartitionTable.java | 4 ++++
.../flink/runtime/taskexecutor/partition/PartitionTableTest.java | 9 +++++++++
2 files changed, 13 insertions(+)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
index 02942cf..d214e09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
@@ -51,6 +51,10 @@ public class PartitionTable<K> {
Preconditions.checkNotNull(key);
Preconditions.checkNotNull(newPartitionIds);
+ if (newPartitionIds.isEmpty()) {
+ return;
+ }
+
trackedPartitionsPerJob.compute(key, (ignored, partitionIds) -> {
if (partitionIds == null) {
partitionIds = new HashSet<>(8);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java
index 4e54af1..e2f63fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java
@@ -63,6 +63,15 @@ public class PartitionTableTest extends TestLogger {
}
@Test
+ public void testStartTrackingZeroPartitionDoesNotMutateState() {
+ final PartitionTable<JobID> table = new PartitionTable<>();
+
+ table.startTrackingPartitions(JOB_ID, Collections.emptyList());
+
+ assertFalse(table.hasTrackedPartitions(JOB_ID));
+ }
+
+ @Test
public void testStopTrackingAllPartitions() {
final PartitionTable<JobID> table = new PartitionTable<>();