You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yukun Guo <gy...@gmail.com> on 2016/06/07 02:26:25 UTC

Hourly top-k statistics of DataStream

Hi,

I'm working on a project which uses Flink to compute hourly log statistics
like top-K. The logs are fetched from Kafka by a FlinkKafkaProducer and
packed
into a DataStream.

The problem is, I find the computation quite challenging to express with
Flink's DataStream API:

1. If I use something like `logs.timeWindow(Time.hours(1))`, suppose that
the
data volume is really high, e.g., billions of logs might be generated in one
hour, will the window grow too large and can't be handled efficiently?

2. We have to create a `KeyedStream` before applying `timeWindow`. However,
the distribution of some keys are skewed hence using them may compromise
the performance due to unbalanced partition loads. (What I want is just
rebalance the stream across all partitions.)

3. The top-K algorithm can be straightforwardly implemented with `DataSet`'s
`mapPartition` and `reduceGroup` API as in
[FLINK-2549](https://github.com/apache/flink/pull/1161/), but not so easy if
taking the DataStream approach, even with the stateful operators. I still
cannot figure out how to reunion streams once they are partitioned.

4. Is it possible to convert a DataStream into a DataSet? If yes, how can I
make Flink analyze the data incrementally rather than aggregating the logs
for
one hour before starting to process?

Re: Hourly top-k statistics of DataStream

Posted by Yukun Guo <gy...@gmail.com>.
My algorithm is roughly like this taking top-K words problem as an example
(the purpose of computing local “word count” is to deal with data
imbalance):

DataStream of words ->
timeWindow of 1h ->
converted to DataSet of words ->
random partitioning by rebalance ->
local “word count” using mapPartition ->
global “word count” using reduceGroup ->
rebalance ->
local top-K using mapPartition ->
global top-K using reduceGroup

Here is some (probably buggy) code to demonstrate the basic idea on DataSet:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

public class WordCount {

  public static void main(String[] args) throws Exception {

    // set up the execution environment
    final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();

    // get input data
    DataSet<String> text = env.fromElements(
        "14159265358979323846264338327950288419716939937510",
        "58209749445923078164062862089986280348253421170679",
        "82148086513282306647093844609550582231725359408128",
        "48111745028410270193852110555964462294895493038196",
        "44288109756659334461284756482337867831652712019091",
        "45648566923460348610454326648213393607260249141273",
        "72458700660631558817488152092096282925409171536436",
        "78925903600113305305488204665213841469519415116094",
        "33057270365759591953092186117381932611793105118548",
        "07446237996274956735188575272489122793818301194912",
        "98336733624406566430860213949463952247371907021798",
        "60943702770539217176293176752384674818467669405132",
        "00056812714526356082778577134275778960917363717872",
        "14684409012249534301465495853710507922796892589235",
        "42019956112129021960864034418159813629774771309960",
        "51870721134999999837297804995105973173281609631859",
        "50244594553469083026425223082533446850352619311881",
        "71010003137838752886587533208381420617177669147303",
        "59825349042875546873115956286388235378759375195778",
        "18577805321712268066130019278766111959092164201989"
    );

    DataSet<Tuple2<String, Integer>> counts = text
        // split up the lines in pairs (2-tuples) containing: (word,1)
        .flatMap(new LineSplitter())
        .rebalance()
        // local word count
        .mapPartition(new MapPartitionFunction<Tuple2<String,
Integer>, Tuple2<String, Integer>>() {
          @Override
          public void mapPartition(Iterable<Tuple2<String, Integer>> words,
                       Collector<Tuple2<String, Integer>> out) throws
Exception {
            SortedMap<String, Integer> m = new TreeMap<String, Integer>();
            for (Tuple2<String, Integer> w : words) {
              Integer current = m.get(w.f0);
              Integer updated = current == null ? w.f1 : current + w.f1;
              m.put(w.f0, updated);
            }

            for (Map.Entry<String, Integer> e : m.entrySet()) {
              out.collect(Tuple2.of(e.getKey(), e.getValue()));
            }
          }
        })
        // global word count
        .reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>,
Tuple2<String, Integer>>() {
          @Override
          public void reduce(Iterable<Tuple2<String, Integer>> wordcounts,
                    Collector<Tuple2<String, Integer>> out) throws Exception {
            SortedMap<String, Integer> m = new TreeMap<String, Integer>();
            for (Tuple2<String, Integer> wc : wordcounts) {
              Integer current = m.get(wc.f0);
              Integer updated = current == null ? wc.f1 : current + wc.f1;
              m.put(wc.f0, updated);
            }

            for (Map.Entry<String, Integer> e : m.entrySet()) {
              out.collect(Tuple2.of(e.getKey(), e.getValue()));
            }
          }
        });

    DataSet<Tuple2<String, Integer>> topK = counts
        .rebalance()
        // local top-K
        .mapPartition(new MapPartitionFunction<Tuple2<String,
Integer>, Tuple2<String, Integer>>() {
          @Override
          public void mapPartition(Iterable<Tuple2<String, Integer>> wordcounts,
                       Collector<Tuple2<String, Integer>> out) throws
Exception {
            SortedMap<Integer, String> topKSoFar = new
TreeMap<Integer, String>();
            for (Tuple2<String, Integer> wc : wordcounts) {
              String w = wc.f0;
              Integer c = wc.f1;
              topKSoFar.put(c, w);
              if (topKSoFar.size() > 3) {
                topKSoFar.remove(topKSoFar.firstKey());
              }
            }

            for (Map.Entry<Integer, String> cw : topKSoFar.entrySet()) {
              out.collect(Tuple2.of(cw.getValue(), cw.getKey()));
            }
          }
        })
        // global top-K
        .reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>,
Tuple2<String, Integer>>() {
          @Override
          public void reduce(Iterable<Tuple2<String, Integer>> topList,
                    Collector<Tuple2<String, Integer>> out) throws Exception {
            SortedMap<Integer, String> topKSoFar = new
TreeMap<Integer, String>();
            for (Tuple2<String, Integer> wc : topList) {
              String w = wc.f0;
              Integer c = wc.f1;
              topKSoFar.put(c, w);
              if (topKSoFar.size() > 3) {
                topKSoFar.remove(topKSoFar.firstKey());
              }
            }

            for (Map.Entry<Integer, String> cw : topKSoFar.entrySet()) {
              out.collect(Tuple2.of(cw.getValue(), cw.getKey()));
            }
          }
        });

    // execute and print result
    topK.print();

    env.setParallelism(4);
    env.execute();

  }


  public static final class LineSplitter implements
FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
      String[] tokens = value.split("");

      for (String token : tokens) {
        if (token.length() > 0) {
          out.collect(new Tuple2<String, Integer>(token, 1));
        }
      }
    }
  }
}

Re: Hourly top-k statistics of DataStream

Posted by Philippe Caparroy <ph...@orange.fr>.
You should have a look at this project : https://github.com/addthis/stream-lib

You can use it within Flink, storing intermediate values in a local state.





> Le 9 juin 2016 à 15:29, Yukun Guo <gy...@gmail.com> a écrit :
> 
> Thank you very much for the detailed answer. Now I understand a DataStream can be repartitioned or “joined” (don’t know the exact terminology) with keyBy.
> 
> But another question: 
> Despite the non-existence of incremental top-k algorithm, I’d like to incrementally compute the local word count during one hour, probably using a TreeMap for counting. As soon as the hour finishes, the TreeMap is converted to a stream of Tuple2 and forwarded to the remaining computation thereafter. I’m concerned about the memory usage: the TreeMap and the Tuple2 collection hold a huge amount of items, do I have to do some custom memory management?
> 
> I’m also not sure whether a TreeMap is suitable here. This StackOverflow question presents a similar approach: http://stackoverflow.com/questions/34681887/how-apache-flink-deal-with-skewed-data <http://stackoverflow.com/questions/34681887/how-apache-flink-deal-with-skewed-data>, but the suggested solution seems rather complicated.
> 
> 
> On 8 June 2016 at 08:04, Jamie Grier <jamie@data-artisans.com <ma...@data-artisans.com>> wrote:
> Suggestions in-line below...
> 
> On Mon, Jun 6, 2016 at 7:26 PM, Yukun Guo <gyk.net@gmail.com <ma...@gmail.com>> wrote:
> Hi,
> 
> I'm working on a project which uses Flink to compute hourly log statistics
> like top-K. The logs are fetched from Kafka by a FlinkKafkaProducer and packed
> into a DataStream.
> 
> The problem is, I find the computation quite challenging to express with
> Flink's DataStream API:
> 
> 1. If I use something like `logs.timeWindow(Time.hours(1))`, suppose that the
> data volume is really high, e.g., billions of logs might be generated in one
> hour, will the window grow too large and can't be handled efficiently?
> 
> In the general case you can use:
> 
>     stream
>         .timeWindow(...)
>         .apply(reduceFunction, windowFunction)
> 
> which can take a ReduceFunction and a WindowFunction.  The ReduceFunction is used to reduce the state on the fly and thereby keep the total state size low.  This can commonly be used in analytics applications to reduce the state size that you're accumulating for each window.  In the specific case of TopK, however, you cannot do this if you want an exact result.  To get an exact result I believe you have to actually keep around all of the data and then calculate TopK at the end in your WindowFunction.  If you are able to use approximate algorithms for your use case than you can calculate a probabilistic incremental TopK based on some sort of sketch-based algorithm.
> 
> 2. We have to create a `KeyedStream` before applying `timeWindow`. However,
> the distribution of some keys are skewed hence using them may compromise
> the performance due to unbalanced partition loads. (What I want is just
> rebalance the stream across all partitions.)
> 
> A good and simple way to approach this may be to come up with a composite key for your data that *is* uniformly distributed.  You can imagine something simple like 'natural_key:random_number'.  Then keyBy(natural_key) and reduce() again.  For example:
> 
>     stream
>         .keyBy(key, rand())      // partition by composite key that is uniformly distributed
>         .timeWindow(1 hour)
>         .reduce()                     // pre-aggregation
>         .keyBy(key)                // repartition
>         .timeWindow(1 hour)
>         .reduce()                     // final aggregation
>  
> 
> 3. The top-K algorithm can be straightforwardly implemented with `DataSet`'s
> `mapPartition` and `reduceGroup` API as in
> [FLINK-2549](https://github.com/apache/flink/pull/1161/ <https://github.com/apache/flink/pull/1161/>), but not so easy if
> taking the DataStream approach, even with the stateful operators. I still
> cannot figure out how to reunion streams once they are partitioned.
> 
>     I'm not sure I know what you're trying to do here.  What do you mean by re-union?
>  
> 4. Is it possible to convert a DataStream into a DataSet? If yes, how can I
> make Flink analyze the data incrementally rather than aggregating the logs for
> one hour before starting to process?
> 
> There is no direct way to turn a DataStream into a DataSet.  I addressed the point about doing the computation incrementally above, though.  You do this with a ReduceFunction.  But again, there doesn't exist an exact incremental TopK algorithm that I'm aware of.  This can be done with sketching, though.
> 
> 
> -- 
> 
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier <https://twitter.com/jamiegrier>
> jamie@data-artisans.com <ma...@data-artisans.com>
> 
> 


Re: Hourly top-k statistics of DataStream

Posted by Christophe Salperwyck <ch...@gmail.com>.
Hi,

There are some implementations to do that with low memory footprint. Have a
look at the count min sketch for example. There are some Java
implementations.

Christophe

2016-06-09 15:29 GMT+02:00 Yukun Guo <gy...@gmail.com>:

> Thank you very much for the detailed answer. Now I understand a DataStream
> can be repartitioned or “joined” (don’t know the exact terminology) with
> keyBy.
>
> But another question:
> Despite the non-existence of incremental top-k algorithm, I’d like to
> incrementally compute the local word count during one hour, probably using
> a TreeMap for counting. As soon as the hour finishes, the TreeMap is
> converted to a stream of Tuple2 and forwarded to the remaining computation
> thereafter. I’m concerned about the memory usage: the TreeMap and the
> Tuple2 collection hold a huge amount of items, do I have to do some custom
> memory management?
>
> I’m also not sure whether a TreeMap is suitable here. This StackOverflow
> question presents a similar approach:
> http://stackoverflow.com/questions/34681887/how-apache-flink-deal-with-skewed-data,
> but the suggested solution seems rather complicated.
>
> On 8 June 2016 at 08:04, Jamie Grier <ja...@data-artisans.com> wrote:
>
>> Suggestions in-line below...
>>
>> On Mon, Jun 6, 2016 at 7:26 PM, Yukun Guo <gy...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I'm working on a project which uses Flink to compute hourly log
>>> statistics
>>> like top-K. The logs are fetched from Kafka by a FlinkKafkaProducer and
>>> packed
>>> into a DataStream.
>>>
>>> The problem is, I find the computation quite challenging to express with
>>> Flink's DataStream API:
>>>
>>> 1. If I use something like `logs.timeWindow(Time.hours(1))`, suppose
>>> that the
>>> data volume is really high, e.g., billions of logs might be generated in
>>> one
>>> hour, will the window grow too large and can't be handled efficiently?
>>>
>>
>> In the general case you can use:
>>
>>     stream
>>         .timeWindow(...)
>>         .apply(reduceFunction, windowFunction)
>>
>> which can take a ReduceFunction and a WindowFunction.  The ReduceFunction
>> is used to reduce the state on the fly and thereby keep the total state
>> size low.  This can commonly be used in analytics applications to reduce
>> the state size that you're accumulating for each window.  In the specific
>> case of TopK, however, you cannot do this if you want an exact result.  To
>> get an exact result I believe you have to actually keep around all of the
>> data and then calculate TopK at the end in your WindowFunction.  If you are
>> able to use approximate algorithms for your use case than you can calculate
>> a probabilistic incremental TopK based on some sort of sketch-based
>> algorithm.
>>
>>>
>>> 2. We have to create a `KeyedStream` before applying `timeWindow`.
>>> However,
>>> the distribution of some keys are skewed hence using them may compromise
>>> the performance due to unbalanced partition loads. (What I want is just
>>> rebalance the stream across all partitions.)
>>>
>>
>> A good and simple way to approach this may be to come up with a composite
>> key for your data that *is* uniformly distributed.  You can imagine
>> something simple like 'natural_key:random_number'.  Then keyBy(natural_key)
>> and reduce() again.  For example:
>>
>>     stream
>>         .keyBy(key, rand())      // partition by composite key that is
>> uniformly distributed
>>         .timeWindow(1 hour)
>>         .reduce()                     // pre-aggregation
>>         .keyBy(key)                // repartition
>>         .timeWindow(1 hour)
>>         .reduce()                     // final aggregation
>>
>>
>>>
>>> 3. The top-K algorithm can be straightforwardly implemented with
>>> `DataSet`'s
>>> `mapPartition` and `reduceGroup` API as in
>>> [FLINK-2549](https://github.com/apache/flink/pull/1161/), but not so
>>> easy if
>>> taking the DataStream approach, even with the stateful operators. I still
>>> cannot figure out how to reunion streams once they are partitioned.
>>>
>>>     I'm not sure I know what you're trying to do here.  What do you mean
>> by re-union?
>>
>>
>>> 4. Is it possible to convert a DataStream into a DataSet? If yes, how
>>> can I
>>> make Flink analyze the data incrementally rather than aggregating the
>>> logs for
>>> one hour before starting to process?
>>>
>>> There is no direct way to turn a DataStream into a DataSet.  I addressed
>> the point about doing the computation incrementally above, though.  You do
>> this with a ReduceFunction.  But again, there doesn't exist an exact
>> incremental TopK algorithm that I'm aware of.  This can be done with
>> sketching, though.
>>
>>
>> --
>>
>> Jamie Grier
>> data Artisans, Director of Applications Engineering
>> @jamiegrier <https://twitter.com/jamiegrier>
>> jamie@data-artisans.com
>>
>>
>

Re: Hourly top-k statistics of DataStream

Posted by Yukun Guo <gy...@gmail.com>.
Thank you very much for the detailed answer. Now I understand a DataStream
can be repartitioned or “joined” (don’t know the exact terminology) with
keyBy.

But another question:
Despite the non-existence of incremental top-k algorithm, I’d like to
incrementally compute the local word count during one hour, probably using
a TreeMap for counting. As soon as the hour finishes, the TreeMap is
converted to a stream of Tuple2 and forwarded to the remaining computation
thereafter. I’m concerned about the memory usage: the TreeMap and the
Tuple2 collection hold a huge amount of items, do I have to do some custom
memory management?

I’m also not sure whether a TreeMap is suitable here. This StackOverflow
question presents a similar approach:
http://stackoverflow.com/questions/34681887/how-apache-flink-deal-with-skewed-data,
but the suggested solution seems rather complicated.

On 8 June 2016 at 08:04, Jamie Grier <ja...@data-artisans.com> wrote:

> Suggestions in-line below...
>
> On Mon, Jun 6, 2016 at 7:26 PM, Yukun Guo <gy...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm working on a project which uses Flink to compute hourly log statistics
>> like top-K. The logs are fetched from Kafka by a FlinkKafkaProducer and
>> packed
>> into a DataStream.
>>
>> The problem is, I find the computation quite challenging to express with
>> Flink's DataStream API:
>>
>> 1. If I use something like `logs.timeWindow(Time.hours(1))`, suppose that
>> the
>> data volume is really high, e.g., billions of logs might be generated in
>> one
>> hour, will the window grow too large and can't be handled efficiently?
>>
>
> In the general case you can use:
>
>     stream
>         .timeWindow(...)
>         .apply(reduceFunction, windowFunction)
>
> which can take a ReduceFunction and a WindowFunction.  The ReduceFunction
> is used to reduce the state on the fly and thereby keep the total state
> size low.  This can commonly be used in analytics applications to reduce
> the state size that you're accumulating for each window.  In the specific
> case of TopK, however, you cannot do this if you want an exact result.  To
> get an exact result I believe you have to actually keep around all of the
> data and then calculate TopK at the end in your WindowFunction.  If you are
> able to use approximate algorithms for your use case than you can calculate
> a probabilistic incremental TopK based on some sort of sketch-based
> algorithm.
>
>>
>> 2. We have to create a `KeyedStream` before applying `timeWindow`.
>> However,
>> the distribution of some keys are skewed hence using them may compromise
>> the performance due to unbalanced partition loads. (What I want is just
>> rebalance the stream across all partitions.)
>>
>
> A good and simple way to approach this may be to come up with a composite
> key for your data that *is* uniformly distributed.  You can imagine
> something simple like 'natural_key:random_number'.  Then keyBy(natural_key)
> and reduce() again.  For example:
>
>     stream
>         .keyBy(key, rand())      // partition by composite key that is
> uniformly distributed
>         .timeWindow(1 hour)
>         .reduce()                     // pre-aggregation
>         .keyBy(key)                // repartition
>         .timeWindow(1 hour)
>         .reduce()                     // final aggregation
>
>
>>
>> 3. The top-K algorithm can be straightforwardly implemented with
>> `DataSet`'s
>> `mapPartition` and `reduceGroup` API as in
>> [FLINK-2549](https://github.com/apache/flink/pull/1161/), but not so
>> easy if
>> taking the DataStream approach, even with the stateful operators. I still
>> cannot figure out how to reunion streams once they are partitioned.
>>
>>     I'm not sure I know what you're trying to do here.  What do you mean
> by re-union?
>
>
>> 4. Is it possible to convert a DataStream into a DataSet? If yes, how can
>> I
>> make Flink analyze the data incrementally rather than aggregating the
>> logs for
>> one hour before starting to process?
>>
>> There is no direct way to turn a DataStream into a DataSet.  I addressed
> the point about doing the computation incrementally above, though.  You do
> this with a ReduceFunction.  But again, there doesn't exist an exact
> incremental TopK algorithm that I'm aware of.  This can be done with
> sketching, though.
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier <https://twitter.com/jamiegrier>
> jamie@data-artisans.com
>
>

Re: Hourly top-k statistics of DataStream

Posted by Jamie Grier <ja...@data-artisans.com>.
Suggestions in-line below...

On Mon, Jun 6, 2016 at 7:26 PM, Yukun Guo <gy...@gmail.com> wrote:

> Hi,
>
> I'm working on a project which uses Flink to compute hourly log statistics
> like top-K. The logs are fetched from Kafka by a FlinkKafkaProducer and
> packed
> into a DataStream.
>
> The problem is, I find the computation quite challenging to express with
> Flink's DataStream API:
>
> 1. If I use something like `logs.timeWindow(Time.hours(1))`, suppose that
> the
> data volume is really high, e.g., billions of logs might be generated in
> one
> hour, will the window grow too large and can't be handled efficiently?
>

In the general case you can use:

    stream
        .timeWindow(...)
        .apply(reduceFunction, windowFunction)

which can take a ReduceFunction and a WindowFunction.  The ReduceFunction
is used to reduce the state on the fly and thereby keep the total state
size low.  This can commonly be used in analytics applications to reduce
the state size that you're accumulating for each window.  In the specific
case of TopK, however, you cannot do this if you want an exact result.  To
get an exact result I believe you have to actually keep around all of the
data and then calculate TopK at the end in your WindowFunction.  If you are
able to use approximate algorithms for your use case than you can calculate
a probabilistic incremental TopK based on some sort of sketch-based
algorithm.

>
> 2. We have to create a `KeyedStream` before applying `timeWindow`. However,
> the distribution of some keys are skewed hence using them may compromise
> the performance due to unbalanced partition loads. (What I want is just
> rebalance the stream across all partitions.)
>

A good and simple way to approach this may be to come up with a composite
key for your data that *is* uniformly distributed.  You can imagine
something simple like 'natural_key:random_number'.  Then keyBy(natural_key)
and reduce() again.  For example:

    stream
        .keyBy(key, rand())      // partition by composite key that is
uniformly distributed
        .timeWindow(1 hour)
        .reduce()                     // pre-aggregation
        .keyBy(key)                // repartition
        .timeWindow(1 hour)
        .reduce()                     // final aggregation


>
> 3. The top-K algorithm can be straightforwardly implemented with
> `DataSet`'s
> `mapPartition` and `reduceGroup` API as in
> [FLINK-2549](https://github.com/apache/flink/pull/1161/), but not so easy
> if
> taking the DataStream approach, even with the stateful operators. I still
> cannot figure out how to reunion streams once they are partitioned.
>
>     I'm not sure I know what you're trying to do here.  What do you mean
by re-union?


> 4. Is it possible to convert a DataStream into a DataSet? If yes, how can I
> make Flink analyze the data incrementally rather than aggregating the logs
> for
> one hour before starting to process?
>
> There is no direct way to turn a DataStream into a DataSet.  I addressed
the point about doing the computation incrementally above, though.  You do
this with a ReduceFunction.  But again, there doesn't exist an exact
incremental TopK algorithm that I'm aware of.  This can be done with
sketching, though.


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
jamie@data-artisans.com