You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (JIRA)" <ji...@apache.org> on 2019/07/15 03:57:00 UTC

[jira] [Resolved] (FLINK-13236) Fix bug and improve performance in TopNBuffer

     [ https://issues.apache.org/jira/browse/FLINK-13236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jark Wu resolved FLINK-13236.
-----------------------------
    Resolution: Fixed

Fixed in 1.9.0: a15834a83d9caf100036df385ff041b0ac9a29be
1.10.0: 6d7c1bd94ae817de20f95328c76eda719b896c74

> Fix bug and improve performance in TopNBuffer
> ---------------------------------------------
>
>                 Key: FLINK-13236
>                 URL: https://issues.apache.org/jira/browse/FLINK-13236
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Caizhi Weng
>            Assignee: Caizhi Weng
>            Priority: Minor
>              Labels: pull-request-available
>             Fix For: 1.9.0, 1.10.0
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> In {{TopNBuffer}} we have the following method:
> {code:java}
> /**
>  * Puts a record list into the buffer under the sortKey.
>  * Note: if buffer already contains sortKey, putAll will overwrite the previous value
>  *
>  * @param sortKey sort key with which the specified values are to be associated
>  * @param values record lists to be associated with the specified key
>  */
> void putAll(BaseRow sortKey, Collection<BaseRow> values) {
> 	treeMap.put(sortKey, values);
> 	currentTopNum += values.size();
> }
> {code}
> When {{sortKey}} already exists in {{treeMap}}, the {{currentTopNum}} should be first subtracted by the old {{value.size()}} in {{treeMap}} then added (can't be directly added). As currently only {{AppendOnlyTopNFunction}} uses this method in its init procedure, this bug is not triggered.
> {code:java}
> /**
>  * Gets record which rank is given value.
>  *
>  * @param rank rank value to search
>  * @return the record which rank is given value
>  */
> BaseRow getElement(int rank) {
> 	int curRank = 0;
> 	Iterator<Map.Entry<BaseRow, Collection<BaseRow>>> iter = treeMap.entrySet().iterator();
> 	while (iter.hasNext()) {
> 		Map.Entry<BaseRow, Collection<BaseRow>> entry = iter.next();
> 		Collection<BaseRow> list = entry.getValue();
> 		Iterator<BaseRow> listIter = list.iterator();
> 		while (listIter.hasNext()) {
> 			BaseRow elem = listIter.next();
> 			curRank += 1;
> 			if (curRank == rank) {
> 				return elem;
> 			}
> 		}
> 	}
> 	return null;
> }
> {code}
> We can remove the inner loop by adding {{curRank}} by {{list.size()}} each time.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)