You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/08/02 07:39:20 UTC

[flink] 01/02: [hotfix][coordination] Check whether partition set to track is empty

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fc72a7c95b4d745e2df64bd75857e77f4d5ca14a
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Jul 26 12:49:45 2019 +0200

    [hotfix][coordination] Check whether partition set to track is empty
---
 .../flink/runtime/taskexecutor/partition/PartitionTable.java     | 4 ++++
 .../flink/runtime/taskexecutor/partition/PartitionTableTest.java | 9 +++++++++
 2 files changed, 13 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
index 02942cf..d214e09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
@@ -51,6 +51,10 @@ public class PartitionTable<K> {
 		Preconditions.checkNotNull(key);
 		Preconditions.checkNotNull(newPartitionIds);
 
+		if (newPartitionIds.isEmpty()) {
+			return;
+		}
+
 		trackedPartitionsPerJob.compute(key, (ignored, partitionIds) -> {
 			if (partitionIds == null) {
 				partitionIds = new HashSet<>(8);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java
index 4e54af1..e2f63fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java
@@ -63,6 +63,15 @@ public class PartitionTableTest extends TestLogger {
 	}
 
 	@Test
+	public void testStartTrackingZeroPartitionDoesNotMutateState() {
+		final PartitionTable<JobID> table = new PartitionTable<>();
+
+		table.startTrackingPartitions(JOB_ID, Collections.emptyList());
+
+		assertFalse(table.hasTrackedPartitions(JOB_ID));
+	}
+
+	@Test
 	public void testStopTrackingAllPartitions() {
 		final PartitionTable<JobID> table = new PartitionTable<>();