You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by ChengXiangLi <gi...@git.apache.org> on 2015/09/22 11:04:44 UTC

[GitHub] flink pull request: [FLINK-2549] add topK operator to DataSet.

GitHub user ChengXiangLi opened a pull request:

    https://github.com/apache/flink/pull/1161

    [FLINK-2549] add topK operator to DataSet.

    The topK operator is implemented with `mapPartition()` followed by `reduceGroup()`, each map tasks select top k elements, and transfer to singleton reduce task(no `group()` before `reduceGroup()`), and then select top k elements in reduce task. 
    The main part of this implementation:
    1. An out-of-core self managed memory based PriorityQueue implementation.
    2.  Use runtime resources to build PriorityQueue for UDF. topK is not real 'native' operator actually, it contains UDF which use self managed memory based PriorityQueue to select top k elements.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ChengXiangLi/flink priorityqueue

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1161.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1161
    
----
commit 7165d5ba78376cec1b3c752afb949e6273d45869
Author: chengxiang li <ch...@intel.com>
Date:   2015-09-16T02:10:05Z

    [FLINK-2549] add topK operator to DataSet.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2549] add topK operator to DataSet.

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1161#discussion_r40230604
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java ---
    @@ -447,11 +447,13 @@ public void writeToOutput(final ChannelWriterOutputView output, final int start,
     				num -= recordsPerSegment;
     			} else {
     				// partially filled segment
    -				for (; num > 0; num--) {
    +				for (; num > 0 && offset <= this.lastEntryOffset; num--, offset += this.recordSize) {
    --- End diff --
    
    Is this a bug in the current implementation? If yes, that is critical.
    
    We should pull this into a dedicated fix and add a test in that case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2549] add topK operator to DataSet.

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on the pull request:

    https://github.com/apache/flink/pull/1161#issuecomment-161501441
  
    Hi, @StephanEwen , is there any progress of Managed Memory Allocation abstractions for UDF? Not only about TopK operator, i think it's also very important for users or DSLs to build more robust and efficient applications. For example, in Table API queries, as the data schema is predictable during each phase of processing, we does not need to create real `Row` object, just store the binary data in self managed memory, and use the offset to read `Row` fields. So all the intermediate data is store as binary on self managed memory, no need to create lots of `Row` object and its fields object anymore, which should be more robust, memory-efficient, and with better performance. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2549] add topK operator to DataSet.

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1161#issuecomment-142672766
  
    This looks super impressive and very well tested.
    
    The way that the operator is integrated into the system needs some improvement, though. The problem is mainly how the managed memory is obtained.
    
    The MemoryManager's memory is shared among all concurrently running tasks. This implementation takes up to half the total memory, which will cause programs to crash that have other memory consumers in the same pipeline. The tests here run, because the operator is executed in isolation, with no other memory consuming operators in the test program.
    
    Memory consumers need to be known to the Optimizer (in the program generation) to compute what maximal fraction of memory a certain consumer may request. That value is part of the Task's configuration and used by the memory consumer to obtain the right maximum amount.
    
    Integrating operators into the optimizer's planning is a bit tedious and not as easy as it could be (we did not get around to refactoring this so far, unfortunately). Maybe we can add some tooling that would mark a UDF as MemoryConsuming and would in that case expose a Memory Allocator that returns the right amount of memory.
    
    What we could do is the following: I will try to get to refactoring some of the Managed Memory Allocation abstractions (we need this anyways for more components) and then expose a MemoryAllocator in the runtime context, which is accessible if a user-defined function has been annotated as a memory consumer.
    
    This may take me two weeks (I am currently in the mids of working on the streaming windows), but if you don't mind letting this rest for some days, I think that is the cleanest approach.
    
    The other parts of the code look good, so after I finish my part, it should be a simple rebase of the TopKMapPartition function and the TopKReducer, and then this is good to merge.
    
    What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2549] add topK operator to DataSet.

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1161#issuecomment-162011423
  
    @ChengXiangLi Sorry for letting you wait, I have not forgotten about this pull request.
    
    There are two things in your comment:
    
      1. Exposing Managed Memory to UDFs (this pull request), which is much more convenient than going through the implementation of a deeply integrated operator.
    
      2. Efficiency for APIs like the Table API. The Table API works on managed memory already, since it sits on top of Flinks join/sort/etc. What you are hinting at is to have a lower level interface where functions gets the memory segments, rather than the row objects, and directly works on the memory segments. That has been a long which of mine as well, but that involves having a separate type of functions that support working with memory segments. Plus more, to handle records that are too large to fit into individual segments.
    
    I am still onto point (1). I aimed a bit too high with how I wanted to abstract that, but will continue.
    
    I would love to see point (2) at some point. If you are eager in driving point (2), I'd be very happy. We should probably have a chat and get this designed, as it involves quite a few things (exposing other abstractions, spanning records, etc). 
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2549] add topK operator to DataSet.

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1161#discussion_r40280217
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java ---
    @@ -447,11 +447,13 @@ public void writeToOutput(final ChannelWriterOutputView output, final int start,
     				num -= recordsPerSegment;
     			} else {
     				// partially filled segment
    -				for (; num > 0; num--) {
    +				for (; num > 0 && offset <= this.lastEntryOffset; num--, offset += this.recordSize) {
    --- End diff --
    
    Yes, it is. It only support to write data inside a `MemorySegment` previously, it's work well before because it only called with `num = 1`. I would create a separate JIRA for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2549] add topK operator to DataSet.

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on the pull request:

    https://github.com/apache/flink/pull/1161#issuecomment-142782327
  
    Thanks, @StephanEwen , I'm totally agree with you about the memory allocation. Actually i tried to understand the memory allocation logic and make UDF consumer part of it, but it turns out that's not easy and break many things, it's better to leave the refactor job to someone who totally understand it, like you. I would wait for you refactor work.
    BTW: It take half of the total memory as upper boundary, but i guess it does not make any different :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---