You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/15 03:55:42 UTC
[flink] branch release-1.9 updated:
[FLINK-13236][table-runtime-blink] Fix bug and improve performance in
TopNBuffer (#9098)
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new a15834a [FLINK-13236][table-runtime-blink] Fix bug and improve performance in TopNBuffer (#9098)
a15834a is described below
commit a15834a83d9caf100036df385ff041b0ac9a29be
Author: TsReaper <ts...@gmail.com>
AuthorDate: Mon Jul 15 11:51:37 2019 +0800
[FLINK-13236][table-runtime-blink] Fix bug and improve performance in TopNBuffer (#9098)
---
.../flink/table/runtime/rank/TopNBuffer.java | 22 +++++++++++++---------
1 file changed, 13 insertions(+), 9 deletions(-)
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/TopNBuffer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/TopNBuffer.java
index 8600d91..794c4f9 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/TopNBuffer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/TopNBuffer.java
@@ -75,6 +75,10 @@ class TopNBuffer implements Serializable {
* @param values record lists to be associated with the specified key
*/
void putAll(BaseRow sortKey, Collection<BaseRow> values) {
+ Collection<BaseRow> oldValues = treeMap.get(sortKey);
+ if (oldValues != null) {
+ currentTopNum -= oldValues.size();
+ }
treeMap.put(sortKey, values);
currentTopNum += values.size();
}
@@ -145,18 +149,18 @@ class TopNBuffer implements Serializable {
*/
BaseRow getElement(int rank) {
int curRank = 0;
- Iterator<Map.Entry<BaseRow, Collection<BaseRow>>> iter = treeMap.entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry<BaseRow, Collection<BaseRow>> entry = iter.next();
+ for (Map.Entry<BaseRow, Collection<BaseRow>> entry : treeMap.entrySet()) {
Collection<BaseRow> list = entry.getValue();
- Iterator<BaseRow> listIter = list.iterator();
- while (listIter.hasNext()) {
- BaseRow elem = listIter.next();
- curRank += 1;
- if (curRank == rank) {
- return elem;
+ if (curRank + list.size() >= rank) {
+ for (BaseRow elem : list) {
+ curRank += 1;
+ if (curRank == rank) {
+ return elem;
+ }
}
+ } else {
+ curRank += list.size();
}
}
return null;