You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ivan Mushketyk <iv...@gmail.com> on 2017/01/24 09:10:49 UTC

How to get top N elements in a DataSet?

Hi,

I have a dataset of tuples with two fields ids and ratings and I need to
find 10 elements with the highest rating in this dataset. I found a
solution, but I think it's suboptimal and I think there should be a better
way to do it.

The best thing that I came up with is to partition dataset by rating, sort
locally and write the partitioned dataset to disk:

dataset
.partitionCustom(new Partitioner<Double>() {
  @Override
  public int partition(Double key, int numPartitions) {
    return key.intValue() % numPartitions;
  }
}, 1) . // partition by rating
.setParallelism(5)
.sortPartition(1, Order.DESCENDING) // locally sort by rating
.writeAsText("..."); // write the partitioned dataset to disk

This will store tuples in sorted files with names 5, 4, 3, ... that contain
ratings in ranges (5, 4], (4, 3], and so on. Then I can read sorted data
from disk and and N elements with the highest rating.
Is there a way to do the same but without writing a partitioned dataset to
a disk?

I tried to use "first(10)" but it seems to give top 10 items from a random
partition. Is there a way to get top N elements from every partition? Then
I could locally sort top values from every partition and find top 10 global
values.

Best regards,
Ivan.

Re: How to get top N elements in a DataSet?

Posted by Ivan Mushketyk <iv...@gmail.com>.
Hi @Fabian, @Gabor, and @Aljoscha,

Thank you for your help! It works as expected.

Best regards,
Ivan.

On Tue, 24 Jan 2017 at 17:04 Fabian Hueske <fh...@gmail.com> wrote:

> Aljoscha, you are right.
> The second mapPartition() needs to have parallelism(1), but the
> sortPartition() as well:
>
>
> dataset // assuming some partitioning that can be reused to avoid a shuffle
>   .sortPartition(1, Order.DESCENDING)
>   .mapPartition(new ReturnFirstTen())
>   .sortPartition(1, Order.DESCENDING).parallelism(1)
>   .mapPartition(new ReturnFirstTen()).parallelism(1)
>
> Anyway, as Gabor pointed out, this solution is very in efficient.
>
> 2017-01-24 17:52 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
>
> @Fabian, I think there's a typo in your code, shouldn't it be
>
> dataset // assuming some partitioning that can be reused to avoid a shuffle
>   .sortPartition(1, Order.DESCENDING)
>   .mapPartition(new ReturnFirstTen())
>   .sortPartition(1, Order.DESCENDING)
>   .mapPartition(new ReturnFirstTen()).parallelism(1)
>
> i.e. the second MapPartition has to be parallelism=1
>
>
> On Tue, 24 Jan 2017 at 11:57 Fabian Hueske <fh...@gmail.com> wrote:
>
> You are of course right Gabor.
> @Ivan, you can use a heap in the MapPartitionFunction to collect the top
> 10 elements (note that you need to create deep-copies if object reuse is
> enabled [1]).
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/batch/index.html#operating-on-data-objects-in-functions
>
>
> 2017-01-24 11:49 GMT+01:00 Gábor Gévay <gg...@gmail.com>:
>
> Hello,
>
> Btw. there is a Jira about this:
> https://issues.apache.org/jira/browse/FLINK-2549
> Note that the discussion there suggests a more efficient approach,
> which doesn't involve sorting the entire partitions.
>
> And if I remember correctly, this question comes up from time to time
> on the mailing list.
>
> Best,
> Gábor
>
>
>
> 2017-01-24 11:35 GMT+01:00 Fabian Hueske <fh...@gmail.com>:
> > Hi Ivan,
> >
> > I think you can use MapPartition for that.
> > So basically:
> >
> > dataset // assuming some partitioning that can be reused to avoid a
> shuffle
> >   .sortPartition(1, Order.DESCENDING)
> >   .mapPartition(new ReturnFirstTen())
> >   .sortPartition(1, Order.DESCENDING).parallelism(1)
> >   .mapPartition(new ReturnFirstTen())
> >
> > Best, Fabian
> >
> >
> > 2017-01-24 10:10 GMT+01:00 Ivan Mushketyk <iv...@gmail.com>:
> >>
> >> Hi,
> >>
> >> I have a dataset of tuples with two fields ids and ratings and I need to
> >> find 10 elements with the highest rating in this dataset. I found a
> >> solution, but I think it's suboptimal and I think there should be a
> better
> >> way to do it.
> >>
> >> The best thing that I came up with is to partition dataset by rating,
> sort
> >> locally and write the partitioned dataset to disk:
> >>
> >> dataset
> >> .partitionCustom(new Partitioner<Double>() {
> >>   @Override
> >>   public int partition(Double key, int numPartitions) {
> >>     return key.intValue() % numPartitions;
> >>   }
> >> }, 1) . // partition by rating
> >> .setParallelism(5)
> >> .sortPartition(1, Order.DESCENDING) // locally sort by rating
> >> .writeAsText("..."); // write the partitioned dataset to disk
> >>
> >> This will store tuples in sorted files with names 5, 4, 3, ... that
> >> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read
> sorted
> >> data from disk and and N elements with the highest rating.
> >> Is there a way to do the same but without writing a partitioned dataset
> to
> >> a disk?
> >>
> >> I tried to use "first(10)" but it seems to give top 10 items from a
> random
> >> partition. Is there a way to get top N elements from every partition?
> Then I
> >> could locally sort top values from every partition and find top 10
> global
> >> values.
> >>
> >> Best regards,
> >> Ivan.
> >>
> >>
> >
>
>
>
>

Re: How to get top N elements in a DataSet?

Posted by Fabian Hueske <fh...@gmail.com>.
Aljoscha, you are right.
The second mapPartition() needs to have parallelism(1), but the
sortPartition() as well:

dataset // assuming some partitioning that can be reused to avoid a shuffle
  .sortPartition(1, Order.DESCENDING)
  .mapPartition(new ReturnFirstTen())
  .sortPartition(1, Order.DESCENDING).parallelism(1)
  .mapPartition(new ReturnFirstTen()).parallelism(1)

Anyway, as Gabor pointed out, this solution is very in efficient.

2017-01-24 17:52 GMT+01:00 Aljoscha Krettek <al...@apache.org>:

> @Fabian, I think there's a typo in your code, shouldn't it be
>
> dataset // assuming some partitioning that can be reused to avoid a shuffle
>   .sortPartition(1, Order.DESCENDING)
>   .mapPartition(new ReturnFirstTen())
>   .sortPartition(1, Order.DESCENDING)
>   .mapPartition(new ReturnFirstTen()).parallelism(1)
>
> i.e. the second MapPartition has to be parallelism=1
>
>
> On Tue, 24 Jan 2017 at 11:57 Fabian Hueske <fh...@gmail.com> wrote:
>
>> You are of course right Gabor.
>> @Ivan, you can use a heap in the MapPartitionFunction to collect the top
>> 10 elements (note that you need to create deep-copies if object reuse is
>> enabled [1]).
>>
>> Best, Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-
>> release-1.1/apis/batch/index.html#operating-on-data-objects-in-functions
>>
>>
>> 2017-01-24 11:49 GMT+01:00 Gábor Gévay <gg...@gmail.com>:
>>
>> Hello,
>>
>> Btw. there is a Jira about this:
>> https://issues.apache.org/jira/browse/FLINK-2549
>> Note that the discussion there suggests a more efficient approach,
>> which doesn't involve sorting the entire partitions.
>>
>> And if I remember correctly, this question comes up from time to time
>> on the mailing list.
>>
>> Best,
>> Gábor
>>
>>
>>
>> 2017-01-24 11:35 GMT+01:00 Fabian Hueske <fh...@gmail.com>:
>> > Hi Ivan,
>> >
>> > I think you can use MapPartition for that.
>> > So basically:
>> >
>> > dataset // assuming some partitioning that can be reused to avoid a
>> shuffle
>> >   .sortPartition(1, Order.DESCENDING)
>> >   .mapPartition(new ReturnFirstTen())
>> >   .sortPartition(1, Order.DESCENDING).parallelism(1)
>> >   .mapPartition(new ReturnFirstTen())
>> >
>> > Best, Fabian
>> >
>> >
>> > 2017-01-24 10:10 GMT+01:00 Ivan Mushketyk <iv...@gmail.com>:
>> >>
>> >> Hi,
>> >>
>> >> I have a dataset of tuples with two fields ids and ratings and I need
>> to
>> >> find 10 elements with the highest rating in this dataset. I found a
>> >> solution, but I think it's suboptimal and I think there should be a
>> better
>> >> way to do it.
>> >>
>> >> The best thing that I came up with is to partition dataset by rating,
>> sort
>> >> locally and write the partitioned dataset to disk:
>> >>
>> >> dataset
>> >> .partitionCustom(new Partitioner<Double>() {
>> >>   @Override
>> >>   public int partition(Double key, int numPartitions) {
>> >>     return key.intValue() % numPartitions;
>> >>   }
>> >> }, 1) . // partition by rating
>> >> .setParallelism(5)
>> >> .sortPartition(1, Order.DESCENDING) // locally sort by rating
>> >> .writeAsText("..."); // write the partitioned dataset to disk
>> >>
>> >> This will store tuples in sorted files with names 5, 4, 3, ... that
>> >> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read
>> sorted
>> >> data from disk and and N elements with the highest rating.
>> >> Is there a way to do the same but without writing a partitioned
>> dataset to
>> >> a disk?
>> >>
>> >> I tried to use "first(10)" but it seems to give top 10 items from a
>> random
>> >> partition. Is there a way to get top N elements from every partition?
>> Then I
>> >> could locally sort top values from every partition and find top 10
>> global
>> >> values.
>> >>
>> >> Best regards,
>> >> Ivan.
>> >>
>> >>
>> >
>>
>>
>>

Re: How to get top N elements in a DataSet?

Posted by Aljoscha Krettek <al...@apache.org>.
@Fabian, I think there's a typo in your code, shouldn't it be

dataset // assuming some partitioning that can be reused to avoid a shuffle
  .sortPartition(1, Order.DESCENDING)
  .mapPartition(new ReturnFirstTen())
  .sortPartition(1, Order.DESCENDING)
  .mapPartition(new ReturnFirstTen()).parallelism(1)

i.e. the second MapPartition has to be parallelism=1


On Tue, 24 Jan 2017 at 11:57 Fabian Hueske <fh...@gmail.com> wrote:

> You are of course right Gabor.
> @Ivan, you can use a heap in the MapPartitionFunction to collect the top
> 10 elements (note that you need to create deep-copies if object reuse is
> enabled [1]).
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/batch/index.html#operating-on-data-objects-in-functions
>
>
> 2017-01-24 11:49 GMT+01:00 Gábor Gévay <gg...@gmail.com>:
>
> Hello,
>
> Btw. there is a Jira about this:
> https://issues.apache.org/jira/browse/FLINK-2549
> Note that the discussion there suggests a more efficient approach,
> which doesn't involve sorting the entire partitions.
>
> And if I remember correctly, this question comes up from time to time
> on the mailing list.
>
> Best,
> Gábor
>
>
>
> 2017-01-24 11:35 GMT+01:00 Fabian Hueske <fh...@gmail.com>:
> > Hi Ivan,
> >
> > I think you can use MapPartition for that.
> > So basically:
> >
> > dataset // assuming some partitioning that can be reused to avoid a
> shuffle
> >   .sortPartition(1, Order.DESCENDING)
> >   .mapPartition(new ReturnFirstTen())
> >   .sortPartition(1, Order.DESCENDING).parallelism(1)
> >   .mapPartition(new ReturnFirstTen())
> >
> > Best, Fabian
> >
> >
> > 2017-01-24 10:10 GMT+01:00 Ivan Mushketyk <iv...@gmail.com>:
> >>
> >> Hi,
> >>
> >> I have a dataset of tuples with two fields ids and ratings and I need to
> >> find 10 elements with the highest rating in this dataset. I found a
> >> solution, but I think it's suboptimal and I think there should be a
> better
> >> way to do it.
> >>
> >> The best thing that I came up with is to partition dataset by rating,
> sort
> >> locally and write the partitioned dataset to disk:
> >>
> >> dataset
> >> .partitionCustom(new Partitioner<Double>() {
> >>   @Override
> >>   public int partition(Double key, int numPartitions) {
> >>     return key.intValue() % numPartitions;
> >>   }
> >> }, 1) . // partition by rating
> >> .setParallelism(5)
> >> .sortPartition(1, Order.DESCENDING) // locally sort by rating
> >> .writeAsText("..."); // write the partitioned dataset to disk
> >>
> >> This will store tuples in sorted files with names 5, 4, 3, ... that
> >> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read
> sorted
> >> data from disk and and N elements with the highest rating.
> >> Is there a way to do the same but without writing a partitioned dataset
> to
> >> a disk?
> >>
> >> I tried to use "first(10)" but it seems to give top 10 items from a
> random
> >> partition. Is there a way to get top N elements from every partition?
> Then I
> >> could locally sort top values from every partition and find top 10
> global
> >> values.
> >>
> >> Best regards,
> >> Ivan.
> >>
> >>
> >
>
>
>

Re: How to get top N elements in a DataSet?

Posted by Fabian Hueske <fh...@gmail.com>.
You are of course right Gabor.
@Ivan, you can use a heap in the MapPartitionFunction to collect the top 10
elements (note that you need to create deep-copies if object reuse is
enabled [1]).

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/batch/index.html#operating-on-data-objects-in-functions


2017-01-24 11:49 GMT+01:00 Gábor Gévay <gg...@gmail.com>:

> Hello,
>
> Btw. there is a Jira about this:
> https://issues.apache.org/jira/browse/FLINK-2549
> Note that the discussion there suggests a more efficient approach,
> which doesn't involve sorting the entire partitions.
>
> And if I remember correctly, this question comes up from time to time
> on the mailing list.
>
> Best,
> Gábor
>
>
>
> 2017-01-24 11:35 GMT+01:00 Fabian Hueske <fh...@gmail.com>:
> > Hi Ivan,
> >
> > I think you can use MapPartition for that.
> > So basically:
> >
> > dataset // assuming some partitioning that can be reused to avoid a
> shuffle
> >   .sortPartition(1, Order.DESCENDING)
> >   .mapPartition(new ReturnFirstTen())
> >   .sortPartition(1, Order.DESCENDING).parallelism(1)
> >   .mapPartition(new ReturnFirstTen())
> >
> > Best, Fabian
> >
> >
> > 2017-01-24 10:10 GMT+01:00 Ivan Mushketyk <iv...@gmail.com>:
> >>
> >> Hi,
> >>
> >> I have a dataset of tuples with two fields ids and ratings and I need to
> >> find 10 elements with the highest rating in this dataset. I found a
> >> solution, but I think it's suboptimal and I think there should be a
> better
> >> way to do it.
> >>
> >> The best thing that I came up with is to partition dataset by rating,
> sort
> >> locally and write the partitioned dataset to disk:
> >>
> >> dataset
> >> .partitionCustom(new Partitioner<Double>() {
> >>   @Override
> >>   public int partition(Double key, int numPartitions) {
> >>     return key.intValue() % numPartitions;
> >>   }
> >> }, 1) . // partition by rating
> >> .setParallelism(5)
> >> .sortPartition(1, Order.DESCENDING) // locally sort by rating
> >> .writeAsText("..."); // write the partitioned dataset to disk
> >>
> >> This will store tuples in sorted files with names 5, 4, 3, ... that
> >> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read
> sorted
> >> data from disk and and N elements with the highest rating.
> >> Is there a way to do the same but without writing a partitioned dataset
> to
> >> a disk?
> >>
> >> I tried to use "first(10)" but it seems to give top 10 items from a
> random
> >> partition. Is there a way to get top N elements from every partition?
> Then I
> >> could locally sort top values from every partition and find top 10
> global
> >> values.
> >>
> >> Best regards,
> >> Ivan.
> >>
> >>
> >
>

Re: How to get top N elements in a DataSet?

Posted by Gábor Gévay <gg...@gmail.com>.
Hello,

Btw. there is a Jira about this:
https://issues.apache.org/jira/browse/FLINK-2549
Note that the discussion there suggests a more efficient approach,
which doesn't involve sorting the entire partitions.

And if I remember correctly, this question comes up from time to time
on the mailing list.

Best,
Gábor



2017-01-24 11:35 GMT+01:00 Fabian Hueske <fh...@gmail.com>:
> Hi Ivan,
>
> I think you can use MapPartition for that.
> So basically:
>
> dataset // assuming some partitioning that can be reused to avoid a shuffle
>   .sortPartition(1, Order.DESCENDING)
>   .mapPartition(new ReturnFirstTen())
>   .sortPartition(1, Order.DESCENDING).parallelism(1)
>   .mapPartition(new ReturnFirstTen())
>
> Best, Fabian
>
>
> 2017-01-24 10:10 GMT+01:00 Ivan Mushketyk <iv...@gmail.com>:
>>
>> Hi,
>>
>> I have a dataset of tuples with two fields ids and ratings and I need to
>> find 10 elements with the highest rating in this dataset. I found a
>> solution, but I think it's suboptimal and I think there should be a better
>> way to do it.
>>
>> The best thing that I came up with is to partition dataset by rating, sort
>> locally and write the partitioned dataset to disk:
>>
>> dataset
>> .partitionCustom(new Partitioner<Double>() {
>>   @Override
>>   public int partition(Double key, int numPartitions) {
>>     return key.intValue() % numPartitions;
>>   }
>> }, 1) . // partition by rating
>> .setParallelism(5)
>> .sortPartition(1, Order.DESCENDING) // locally sort by rating
>> .writeAsText("..."); // write the partitioned dataset to disk
>>
>> This will store tuples in sorted files with names 5, 4, 3, ... that
>> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read sorted
>> data from disk and and N elements with the highest rating.
>> Is there a way to do the same but without writing a partitioned dataset to
>> a disk?
>>
>> I tried to use "first(10)" but it seems to give top 10 items from a random
>> partition. Is there a way to get top N elements from every partition? Then I
>> could locally sort top values from every partition and find top 10 global
>> values.
>>
>> Best regards,
>> Ivan.
>>
>>
>

Re: How to get top N elements in a DataSet?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Ivan,

I think you can use MapPartition for that.
So basically:

dataset // assuming some partitioning that can be reused to avoid a shuffle
  .sortPartition(1, Order.DESCENDING)
  .mapPartition(new ReturnFirstTen())
  .sortPartition(1, Order.DESCENDING).parallelism(1)
  .mapPartition(new ReturnFirstTen())

Best, Fabian


2017-01-24 10:10 GMT+01:00 Ivan Mushketyk <iv...@gmail.com>:

> Hi,
>
> I have a dataset of tuples with two fields ids and ratings and I need to
> find 10 elements with the highest rating in this dataset. I found a
> solution, but I think it's suboptimal and I think there should be a better
> way to do it.
>
> The best thing that I came up with is to partition dataset by rating, sort
> locally and write the partitioned dataset to disk:
>
> dataset
> .partitionCustom(new Partitioner<Double>() {
>   @Override
>   public int partition(Double key, int numPartitions) {
>     return key.intValue() % numPartitions;
>   }
> }, 1) . // partition by rating
> .setParallelism(5)
> .sortPartition(1, Order.DESCENDING) // locally sort by rating
> .writeAsText("..."); // write the partitioned dataset to disk
>
> This will store tuples in sorted files with names 5, 4, 3, ... that
> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read sorted
> data from disk and and N elements with the highest rating.
> Is there a way to do the same but without writing a partitioned dataset to
> a disk?
>
> I tried to use "first(10)" but it seems to give top 10 items from a random
> partition. Is there a way to get top N elements from every partition? Then
> I could locally sort top values from every partition and find top 10 global
> values.
>
> Best regards,
> Ivan.
>
>
>