You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/06/18 07:47:06 UTC
flink git commit: [FLINK-9601][state] Make sure that the copied array
in `CopyOnWriteStateTable#snapshotTableArrays()` is big enough to hold all
(flattened) entries
Repository: flink
Updated Branches:
refs/heads/master ae9178fa0 -> 0e9b066aa
[FLINK-9601][state] Make sure that the copied array in `CopyOnWriteStateTable#snapshotTableArrays()` is big enough to hold all (flattened) entries
This closes #6174.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0e9b066a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0e9b066a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0e9b066a
Branch: refs/heads/master
Commit: 0e9b066aaba405f58b807c5897213048cfa8319a
Parents: ae9178f
Author: sihuazhou <su...@163.com>
Authored: Sat Jun 16 18:22:49 2018 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Mon Jun 18 09:45:00 2018 +0200
----------------------------------------------------------------------
.../runtime/state/heap/CopyOnWriteStateTable.java | 18 +++++++++++++-----
.../state/heap/CopyOnWriteStateTableSnapshot.java | 2 ++
2 files changed, 15 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0e9b066a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
index bb90c37..10d5636 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import java.util.Arrays;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.NoSuchElementException;
@@ -605,11 +604,20 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen
}
StateTableEntry<K, N, S>[] table = primaryTable;
+
+ // In order to reuse the copied array as the destination array for the partitioned records in
+ // CopyOnWriteStateTableSnapshot#partitionByKeyGroup(), we need to make sure that the copied array
+ // is big enough to hold the flattened entries. In fact, given the current rehashing algorithm, we only
+ // need to do this check when isRehashing() is false, but in order to get a more robust code(in case that
+ // the rehashing algorithm may changed in the future), we do this check for all the case.
+ final int totalTableIndexSize = rehashIndex + table.length;
+ final int copiedArraySize = Math.max(totalTableIndexSize, size());
+ final StateTableEntry<K, N, S>[] copy = new StateTableEntry[copiedArraySize];
+
if (isRehashing()) {
// consider both tables for the snapshot, the rehash index tells us which part of the two tables we need
final int localRehashIndex = rehashIndex;
final int localCopyLength = table.length - localRehashIndex;
- StateTableEntry<K, N, S>[] copy = new StateTableEntry[localRehashIndex + table.length];
// for the primary table, take every index >= rhIdx.
System.arraycopy(table, localRehashIndex, copy, 0, localCopyLength);
@@ -618,12 +626,12 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen
table = incrementalRehashTable;
System.arraycopy(table, 0, copy, localCopyLength, localRehashIndex);
System.arraycopy(table, table.length >>> 1, copy, localCopyLength + localRehashIndex, localRehashIndex);
-
- return copy;
} else {
// we only need to copy the primary table
- return Arrays.copyOf(table, table.length);
+ System.arraycopy(table, 0, copy, 0, table.length);
}
+
+ return copy;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/0e9b066a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
index cf0056e..4c0ab6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
@@ -196,6 +196,8 @@ public class CopyOnWriteStateTableSnapshot<K, N, S>
super(
new CopyOnWriteStateTable.StateTableEntry[stateTableSize],
stateTableSize,
+ // We have made sure that the snapshotData is big enough to hold the flattened entries in
+ // CopyOnWriteStateTable#snapshotTableArrays(), we can safely reuse it as the destination array here.
snapshotData,
keyGroupRange,
totalKeyGroups,