You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Saiph Kappa <sa...@gmail.com> on 2016/01/11 19:41:19 UTC

How to sort tuples in DataStream

Hi,

I'm trying to do a simple application in Flink Stream to count the top N
words on a window-basis, but I don't know how to sort the words by their
frequency in Flink.

In spark streaming, I would do something like this:
«
val isAscending = true
stream.reduceByKeyAndWindow(reduceFunc, Seconds(10), Seconds
(10)).transform(_.sortByKey(isAscending)).map(_._2)
»

How can I do it in Flink Stream?

This is what I have so far:
«
val reduceFunc = (a: String, b: String) => {

  val aElems = a.split(Separator)
  val bElems = b.split(Separator)
  val result = a(params.aggParams.get.head.aggIndex).toInt +
b(params.aggParams.get.head.aggIndex).toInt
  result.toString
}

stream.keyBy(0).timeWindow(Time.seconds(10),
Time.seconds(10)).reduce(reduceFunc)
»


My stream is just a series of strings like "field1|field2|field3|..."

Thanks.

Re: How to sort tuples in DataStream

Posted by Stephan Ewen <se...@apache.org>.
Hi!

Since a stream is infinite, you cannot simply sort it (Flink does not
follow the mini batch model). You can only sort in windows.

I assume you key by word and sum up the counts.  Since you want to get the
most frequent words, you would need to sort across keys, which you can do
in a windowAll() function. Since you want a global sort, this will end up
being a non-parallel step.

A more efficient variant is to have a bounded (N) max heap in the
windowAll() function that you update with new elements and emit at the end.
A fold() function should allow you to implement that.

BTW: It is probably also more efficient to parse the Strings into numbers
once at the beginning of the program.

Stephan

On Mon, Jan 11, 2016 at 7:41 PM, Saiph Kappa <sa...@gmail.com> wrote:

> Hi,
>
> I'm trying to do a simple application in Flink Stream to count the top N
> words on a window-basis, but I don't know how to sort the words by their
> frequency in Flink.
>
> In spark streaming, I would do something like this:
> «
> val isAscending = true
> stream.reduceByKeyAndWindow(reduceFunc, Seconds(10), Seconds
> (10)).transform(_.sortByKey(isAscending)).map(_._2)
> »
>
> How can I do it in Flink Stream?
>
> This is what I have so far:
> «
> val reduceFunc = (a: String, b: String) => {
>
>   val aElems = a.split(Separator)
>   val bElems = b.split(Separator)
>   val result = a(params.aggParams.get.head.aggIndex).toInt + b(params.aggParams.get.head.aggIndex).toInt
>   result.toString
> }
>
> stream.keyBy(0).timeWindow(Time.seconds(10), Time.seconds(10)).reduce(reduceFunc)
> »
>
>
> My stream is just a series of strings like "field1|field2|field3|..."
>
> Thanks.
>