You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/09 03:09:44 UTC

[flink] branch release-1.11 updated: [FLINK-17625][table-runtime-blink] Fix ArrayIndexOutOfBoundsException in AppendOnlyTopNFunction

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 11958b0  [FLINK-17625][table-runtime-blink] Fix ArrayIndexOutOfBoundsException in AppendOnlyTopNFunction
11958b0 is described below

commit 11958b0dd424c143184ed8fd5a2943049b27a4b4
Author: lsy <ld...@163.com>
AuthorDate: Tue Jun 9 11:04:59 2020 +0800

    [FLINK-17625][table-runtime-blink] Fix ArrayIndexOutOfBoundsException in AppendOnlyTopNFunction
    
    This closes #12303
---
 .../operators/rank/AppendOnlyTopNFunction.java     | 16 +++--
 .../table/runtime/operators/rank/TopNBuffer.java   | 83 +++++++++++++++-------
 2 files changed, 68 insertions(+), 31 deletions(-)

diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java
index 6a3cf5a..82e2fae 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java
@@ -211,16 +211,22 @@ public class AppendOnlyTopNFunction extends AbstractTopNFunction {
 		if (buffer.getCurrentTopNum() > rankEnd) {
 			Map.Entry<RowData, Collection<RowData>> lastEntry = buffer.lastEntry();
 			RowData lastKey = lastEntry.getKey();
-			List<RowData> lastList = (List<RowData>) lastEntry.getValue();
+			Collection<RowData> lastList = lastEntry.getValue();
+			RowData lastElement = buffer.lastElement();
+			int size = lastList.size();
 			// remove last one
-			RowData lastElement = lastList.remove(lastList.size() - 1);
-			if (lastList.isEmpty()) {
+			if (size <= 1) {
 				buffer.removeAll(lastKey);
 				dataState.remove(lastKey);
 			} else {
-				dataState.put(lastKey, lastList);
+				buffer.removeLast();
+				// last element has been removed from lastList, we have to copy a new collection
+				// for lastList to avoid mutating state values, see CopyOnWriteStateMap,
+				// otherwise, the result might be corrupt.
+				// don't need to perform a deep copy, because RowData elements will not be updated
+				dataState.put(lastKey, new ArrayList<>(lastList));
 			}
-			if (input.equals(lastElement)) {
+			if (size == 0 || input.equals(lastElement)) {
 				return;
 			} else {
 				// lastElement shouldn't be null
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/TopNBuffer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/TopNBuffer.java
index 5d3ec52..3c8f7f5 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/TopNBuffer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/TopNBuffer.java
@@ -23,7 +23,7 @@ import org.apache.flink.table.data.RowData;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Comparator;
-import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
@@ -94,12 +94,12 @@ class TopNBuffer implements Serializable {
 	}
 
 	public void remove(RowData sortKey, RowData value) {
-		Collection<RowData> list = treeMap.get(sortKey);
-		if (list != null) {
-			if (list.remove(value)) {
+		Collection<RowData> collection = treeMap.get(sortKey);
+		if (collection != null) {
+			if (collection.remove(value)) {
 				currentTopNum -= 1;
 			}
-			if (list.size() == 0) {
+			if (collection.size() == 0) {
 				treeMap.remove(sortKey);
 			}
 		}
@@ -111,9 +111,9 @@ class TopNBuffer implements Serializable {
 	 * @param sortKey key to remove
 	 */
 	void removeAll(RowData sortKey) {
-		Collection<RowData> list = treeMap.get(sortKey);
-		if (list != null) {
-			currentTopNum -= list.size();
+		Collection<RowData> collection = treeMap.get(sortKey);
+		if (collection != null) {
+			currentTopNum -= collection.size();
 			treeMap.remove(sortKey);
 		}
 	}
@@ -127,14 +127,28 @@ class TopNBuffer implements Serializable {
 		Map.Entry<RowData, Collection<RowData>> last = treeMap.lastEntry();
 		RowData lastElement = null;
 		if (last != null) {
-			Collection<RowData> list = last.getValue();
-			lastElement = getLastElement(list);
-			if (lastElement != null) {
-				if (list.remove(lastElement)) {
-					currentTopNum -= 1;
-				}
-				if (list.size() == 0) {
-					treeMap.remove(last.getKey());
+			Collection<RowData> collection = last.getValue();
+			if (collection != null) {
+				if (collection instanceof List) {
+					// optimization for List
+					List<RowData> list = (List<RowData>) collection;
+					if (!list.isEmpty()) {
+						lastElement = list.remove(list.size() - 1);
+						currentTopNum -= 1;
+						if (list.isEmpty()) {
+							treeMap.remove(last.getKey());
+						}
+					}
+				} else {
+					lastElement = getLastElement(collection);
+					if (lastElement != null) {
+						if (collection.remove(lastElement)) {
+							currentTopNum -= 1;
+						}
+						if (collection.size() == 0) {
+							treeMap.remove(last.getKey());
+						}
+					}
 				}
 			}
 		}
@@ -142,6 +156,19 @@ class TopNBuffer implements Serializable {
 	}
 
 	/**
+	 * Returns the last record of the last Entry in the buffer.
+	 */
+	RowData lastElement() {
+		Map.Entry<RowData, Collection<RowData>> last = treeMap.lastEntry();
+		RowData lastElement = null;
+		if (last != null) {
+			Collection<RowData> collection = last.getValue();
+			lastElement = getLastElement(collection);
+		}
+		return lastElement;
+	}
+
+	/**
 	 * Gets record which rank is given value.
 	 *
 	 * @param rank rank value to search
@@ -150,28 +177,32 @@ class TopNBuffer implements Serializable {
 	RowData getElement(int rank) {
 		int curRank = 0;
 		for (Map.Entry<RowData, Collection<RowData>> entry : treeMap.entrySet()) {
-			Collection<RowData> list = entry.getValue();
-
-			if (curRank + list.size() >= rank) {
-				for (RowData elem : list) {
+			Collection<RowData> collection = entry.getValue();
+			if (curRank + collection.size() >= rank) {
+				for (RowData elem : collection) {
 					curRank += 1;
 					if (curRank == rank) {
 						return elem;
 					}
 				}
 			} else {
-				curRank += list.size();
+				curRank += collection.size();
 			}
 		}
 		return null;
 	}
 
-	private RowData getLastElement(Collection<RowData> list) {
+	private RowData getLastElement(Collection<RowData> collection) {
 		RowData element = null;
-		if (list != null && !list.isEmpty()) {
-			Iterator<RowData> iter = list.iterator();
-			while (iter.hasNext()) {
-				element = iter.next();
+		if (collection != null && !collection.isEmpty()) {
+			if (collection instanceof List) {
+				// optimize for List
+				List<RowData> list = (List<RowData>) collection;
+				return list.get(list.size() - 1);
+			} else {
+				for (RowData data : collection) {
+					element = data;
+				}
 			}
 		}
 		return element;