You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/09 03:09:44 UTC
[flink] branch release-1.11 updated:
[FLINK-17625][table-runtime-blink] Fix ArrayIndexOutOfBoundsException in
AppendOnlyTopNFunction
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 11958b0 [FLINK-17625][table-runtime-blink] Fix ArrayIndexOutOfBoundsException in AppendOnlyTopNFunction
11958b0 is described below
commit 11958b0dd424c143184ed8fd5a2943049b27a4b4
Author: lsy <ld...@163.com>
AuthorDate: Tue Jun 9 11:04:59 2020 +0800
[FLINK-17625][table-runtime-blink] Fix ArrayIndexOutOfBoundsException in AppendOnlyTopNFunction
This closes #12303
---
.../operators/rank/AppendOnlyTopNFunction.java | 16 +++--
.../table/runtime/operators/rank/TopNBuffer.java | 83 +++++++++++++++-------
2 files changed, 68 insertions(+), 31 deletions(-)
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java
index 6a3cf5a..82e2fae 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java
@@ -211,16 +211,22 @@ public class AppendOnlyTopNFunction extends AbstractTopNFunction {
if (buffer.getCurrentTopNum() > rankEnd) {
Map.Entry<RowData, Collection<RowData>> lastEntry = buffer.lastEntry();
RowData lastKey = lastEntry.getKey();
- List<RowData> lastList = (List<RowData>) lastEntry.getValue();
+ Collection<RowData> lastList = lastEntry.getValue();
+ RowData lastElement = buffer.lastElement();
+ int size = lastList.size();
// remove last one
- RowData lastElement = lastList.remove(lastList.size() - 1);
- if (lastList.isEmpty()) {
+ if (size <= 1) {
buffer.removeAll(lastKey);
dataState.remove(lastKey);
} else {
- dataState.put(lastKey, lastList);
+ buffer.removeLast();
+ // last element has been removed from lastList, we have to copy a new collection
+ // for lastList to avoid mutating state values, see CopyOnWriteStateMap,
+ // otherwise, the result might be corrupt.
+ // don't need to perform a deep copy, because RowData elements will not be updated
+ dataState.put(lastKey, new ArrayList<>(lastList));
}
- if (input.equals(lastElement)) {
+ if (size == 0 || input.equals(lastElement)) {
return;
} else {
// lastElement shouldn't be null
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/TopNBuffer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/TopNBuffer.java
index 5d3ec52..3c8f7f5 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/TopNBuffer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/TopNBuffer.java
@@ -23,7 +23,7 @@ import org.apache.flink.table.data.RowData;
import java.io.Serializable;
import java.util.Collection;
import java.util.Comparator;
-import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
@@ -94,12 +94,12 @@ class TopNBuffer implements Serializable {
}
public void remove(RowData sortKey, RowData value) {
- Collection<RowData> list = treeMap.get(sortKey);
- if (list != null) {
- if (list.remove(value)) {
+ Collection<RowData> collection = treeMap.get(sortKey);
+ if (collection != null) {
+ if (collection.remove(value)) {
currentTopNum -= 1;
}
- if (list.size() == 0) {
+ if (collection.size() == 0) {
treeMap.remove(sortKey);
}
}
@@ -111,9 +111,9 @@ class TopNBuffer implements Serializable {
* @param sortKey key to remove
*/
void removeAll(RowData sortKey) {
- Collection<RowData> list = treeMap.get(sortKey);
- if (list != null) {
- currentTopNum -= list.size();
+ Collection<RowData> collection = treeMap.get(sortKey);
+ if (collection != null) {
+ currentTopNum -= collection.size();
treeMap.remove(sortKey);
}
}
@@ -127,14 +127,28 @@ class TopNBuffer implements Serializable {
Map.Entry<RowData, Collection<RowData>> last = treeMap.lastEntry();
RowData lastElement = null;
if (last != null) {
- Collection<RowData> list = last.getValue();
- lastElement = getLastElement(list);
- if (lastElement != null) {
- if (list.remove(lastElement)) {
- currentTopNum -= 1;
- }
- if (list.size() == 0) {
- treeMap.remove(last.getKey());
+ Collection<RowData> collection = last.getValue();
+ if (collection != null) {
+ if (collection instanceof List) {
+ // optimization for List
+ List<RowData> list = (List<RowData>) collection;
+ if (!list.isEmpty()) {
+ lastElement = list.remove(list.size() - 1);
+ currentTopNum -= 1;
+ if (list.isEmpty()) {
+ treeMap.remove(last.getKey());
+ }
+ }
+ } else {
+ lastElement = getLastElement(collection);
+ if (lastElement != null) {
+ if (collection.remove(lastElement)) {
+ currentTopNum -= 1;
+ }
+ if (collection.size() == 0) {
+ treeMap.remove(last.getKey());
+ }
+ }
}
}
}
@@ -142,6 +156,19 @@ class TopNBuffer implements Serializable {
}
/**
+ * Returns the last record of the last Entry in the buffer.
+ */
+ RowData lastElement() {
+ Map.Entry<RowData, Collection<RowData>> last = treeMap.lastEntry();
+ RowData lastElement = null;
+ if (last != null) {
+ Collection<RowData> collection = last.getValue();
+ lastElement = getLastElement(collection);
+ }
+ return lastElement;
+ }
+
+ /**
* Gets record which rank is given value.
*
* @param rank rank value to search
@@ -150,28 +177,32 @@ class TopNBuffer implements Serializable {
RowData getElement(int rank) {
int curRank = 0;
for (Map.Entry<RowData, Collection<RowData>> entry : treeMap.entrySet()) {
- Collection<RowData> list = entry.getValue();
-
- if (curRank + list.size() >= rank) {
- for (RowData elem : list) {
+ Collection<RowData> collection = entry.getValue();
+ if (curRank + collection.size() >= rank) {
+ for (RowData elem : collection) {
curRank += 1;
if (curRank == rank) {
return elem;
}
}
} else {
- curRank += list.size();
+ curRank += collection.size();
}
}
return null;
}
- private RowData getLastElement(Collection<RowData> list) {
+ private RowData getLastElement(Collection<RowData> collection) {
RowData element = null;
- if (list != null && !list.isEmpty()) {
- Iterator<RowData> iter = list.iterator();
- while (iter.hasNext()) {
- element = iter.next();
+ if (collection != null && !collection.isEmpty()) {
+ if (collection instanceof List) {
+ // optimize for List
+ List<RowData> list = (List<RowData>) collection;
+ return list.get(list.size() - 1);
+ } else {
+ for (RowData data : collection) {
+ element = data;
+ }
}
}
return element;