You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/06/18 07:47:06 UTC

flink git commit: [FLINK-9601][state] Make sure that the copied array in `CopyOnWriteStateTable#snapshotTableArrays()` is big enough to hold all (flattened) entries

Repository: flink
Updated Branches:
  refs/heads/master ae9178fa0 -> 0e9b066aa


[FLINK-9601][state] Make sure that the copied array in `CopyOnWriteStateTable#snapshotTableArrays()` is big enough to hold all (flattened) entries

This closes #6174.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0e9b066a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0e9b066a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0e9b066a

Branch: refs/heads/master
Commit: 0e9b066aaba405f58b807c5897213048cfa8319a
Parents: ae9178f
Author: sihuazhou <su...@163.com>
Authored: Sat Jun 16 18:22:49 2018 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Mon Jun 18 09:45:00 2018 +0200

----------------------------------------------------------------------
 .../runtime/state/heap/CopyOnWriteStateTable.java | 18 +++++++++++++-----
 .../state/heap/CopyOnWriteStateTableSnapshot.java |  2 ++
 2 files changed, 15 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0e9b066a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
index bb90c37..10d5636 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import java.util.Arrays;
 import java.util.ConcurrentModificationException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
@@ -605,11 +604,20 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen
 		}
 
 		StateTableEntry<K, N, S>[] table = primaryTable;
+
+		// In order to reuse the copied array as the destination array for the partitioned records in
+		// CopyOnWriteStateTableSnapshot#partitionByKeyGroup(), we need to make sure that the copied array
+		// is big enough to hold the flattened entries. In fact, given the current rehashing algorithm, we only
+		// need to do this check when isRehashing() is false, but in order to get a more robust code(in case that
+		// the rehashing algorithm may changed in the future), we do this check for all the case.
+		final int totalTableIndexSize = rehashIndex + table.length;
+		final int copiedArraySize = Math.max(totalTableIndexSize, size());
+		final StateTableEntry<K, N, S>[] copy = new StateTableEntry[copiedArraySize];
+
 		if (isRehashing()) {
 			// consider both tables for the snapshot, the rehash index tells us which part of the two tables we need
 			final int localRehashIndex = rehashIndex;
 			final int localCopyLength = table.length - localRehashIndex;
-			StateTableEntry<K, N, S>[] copy = new StateTableEntry[localRehashIndex + table.length];
 			// for the primary table, take every index >= rhIdx.
 			System.arraycopy(table, localRehashIndex, copy, 0, localCopyLength);
 
@@ -618,12 +626,12 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen
 			table = incrementalRehashTable;
 			System.arraycopy(table, 0, copy, localCopyLength, localRehashIndex);
 			System.arraycopy(table, table.length >>> 1, copy, localCopyLength + localRehashIndex, localRehashIndex);
-
-			return copy;
 		} else {
 			// we only need to copy the primary table
-			return Arrays.copyOf(table, table.length);
+			System.arraycopy(table, 0, copy, 0, table.length);
 		}
+
+		return copy;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0e9b066a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
index cf0056e..4c0ab6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
@@ -196,6 +196,8 @@ public class CopyOnWriteStateTableSnapshot<K, N, S>
 			super(
 				new CopyOnWriteStateTable.StateTableEntry[stateTableSize],
 				stateTableSize,
+				// We have made sure that the snapshotData is big enough to hold the flattened entries in
+				// CopyOnWriteStateTable#snapshotTableArrays(), we can safely reuse it as the destination array here.
 				snapshotData,
 				keyGroupRange,
 				totalKeyGroups,