You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Mohit Anchlia <mo...@gmail.com> on 2017/08/15 23:21:56 UTC
Avoiding duplicates in joined stream
What's the best way to avoid duplicates in joined stream. In below code I
get duplicates of "A" because I have multiple of "A" in fileInput3.
SingleOutputStreamOperator<String> fileInput3 = streamEnv.fromElements("A",
"A")
.assignTimestampsAndWatermarks(timestampAndWatermarkAssigner1);
fileInput1.join(fileInput3).where(keySelector1).equalTo(keySelector2)
.window(TumblingEventTimeWindows.*of*(Time.*milliseconds*(3000)))
.apply(function).print();
Re: Avoiding duplicates in joined stream
Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
The problem with reduplication in a streaming pipeline is that you need to keep all data that you ever saw or do the de-duplication only on a window. You can do the first by writing a keyed FlatMap operation that keeps state and only emits an incoming element if it hasn't been seen so far. Something like this:
DataStream input = ...
DataStream deduped = input
.keyBy(new MyKeySelector())
.flatMap(new MyDedupingFlatMap())
Or you could do this on a window using .keyBy().window().reduce() (or apply())
Best,
Aljoscha
> On 16. Aug 2017, at 01:21, Mohit Anchlia <mo...@gmail.com> wrote:
>
> What's the best way to avoid duplicates in joined stream. In below code I get duplicates of "A" because I have multiple of "A" in fileInput3.
>
> SingleOutputStreamOperator<String> fileInput3 = streamEnv.fromElements("A", "A")
>
> .assignTimestampsAndWatermarks(timestampAndWatermarkAssigner1);
>
>
> fileInput1.join(fileInput3).where(keySelector1).equalTo(keySelector2)
>
> .window(TumblingEventTimeWindows.of(Time.milliseconds(3000)))
>
> .apply(function).print();
>
>