You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/15 03:55:42 UTC

[flink] branch release-1.9 updated: [FLINK-13236][table-runtime-blink] Fix bug and improve performance in TopNBuffer (#9098)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new a15834a  [FLINK-13236][table-runtime-blink] Fix bug and improve performance in TopNBuffer (#9098)
a15834a is described below

commit a15834a83d9caf100036df385ff041b0ac9a29be
Author: TsReaper <ts...@gmail.com>
AuthorDate: Mon Jul 15 11:51:37 2019 +0800

    [FLINK-13236][table-runtime-blink] Fix bug and improve performance in TopNBuffer (#9098)
---
 .../flink/table/runtime/rank/TopNBuffer.java       | 22 +++++++++++++---------
 1 file changed, 13 insertions(+), 9 deletions(-)

diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/TopNBuffer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/TopNBuffer.java
index 8600d91..794c4f9 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/TopNBuffer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/TopNBuffer.java
@@ -75,6 +75,10 @@ class TopNBuffer implements Serializable {
 	 * @param values record lists to be associated with the specified key
 	 */
 	void putAll(BaseRow sortKey, Collection<BaseRow> values) {
+		Collection<BaseRow> oldValues = treeMap.get(sortKey);
+		if (oldValues != null) {
+			currentTopNum -= oldValues.size();
+		}
 		treeMap.put(sortKey, values);
 		currentTopNum += values.size();
 	}
@@ -145,18 +149,18 @@ class TopNBuffer implements Serializable {
 	 */
 	BaseRow getElement(int rank) {
 		int curRank = 0;
-		Iterator<Map.Entry<BaseRow, Collection<BaseRow>>> iter = treeMap.entrySet().iterator();
-		while (iter.hasNext()) {
-			Map.Entry<BaseRow, Collection<BaseRow>> entry = iter.next();
+		for (Map.Entry<BaseRow, Collection<BaseRow>> entry : treeMap.entrySet()) {
 			Collection<BaseRow> list = entry.getValue();
 
-			Iterator<BaseRow> listIter = list.iterator();
-			while (listIter.hasNext()) {
-				BaseRow elem = listIter.next();
-				curRank += 1;
-				if (curRank == rank) {
-					return elem;
+			if (curRank + list.size() >= rank) {
+				for (BaseRow elem : list) {
+					curRank += 1;
+					if (curRank == rank) {
+						return elem;
+					}
 				}
+			} else {
+				curRank += list.size();
 			}
 		}
 		return null;