You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by EC Boost <ec...@gmail.com> on 2018/06/03 17:25:36 UTC

Kafka Streams Produced Wrong (duplicated) Results with Simple Windowed Aggregation Case

Hello Everyone,

I got duplicated results using kstreams for simple  windowed aggregation.

The input event format is comma seperated:  "event_id,event_type" and I
need to aggregate them by event type.

Following is the Kafka Stream processing logic:

events
      .map((k, v) -> KeyValue.pair(v.split(",")[1], v.split(",")[0]))
      .groupByKey()
      .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10)))
      .aggregate(
        ArrayList::new,
        (type, id, eventList) -> {
          eventList.add(id);
          return eventList;
        },
        Materialized.with(stringSerde, arraySerde)
      )
      .toStream((k,v) -> k.key())
      .mapValues((v)-> String.join(",", v))
      .to("ks-debug-output", Produced.with(stringSerde, stringSerde));


I produced the input messages using the following snippet:

require "kafka"

kafka = Kafka.new(["localhost:9092"], client_id: "event-producer")

f = File.open("events.txt")
f.each_line { |l|
  puts l
  kafka.deliver_message("#{l.strip}", topic: "ks-debug-input")
  sleep(3)
}



Messages in events.txt is the following ( format : "event_id,event_type"
and event_id is unique )  :

Input

1,t6
2,t1
3,t7
4,t5
5,t5
6,t6
7,t6
8,t4
9,t6
10,t7
11,t6
12,t5
13,t6
14,t4
15,t4
16,t2
17,t7
18,t6
19,t3
20,t7
21,t1
22,t5
23,t5
24,t6
25,t6
26,t4
27,t4
28,t3
29,t2
30,t5
31,t1
32,t1
33,t1
34,t1
35,t2
36,t4
37,t3
38,t3
39,t6
40,t6
41,t1
42,t4
43,t4
44,t6
45,t6
46,t7
47,t7
48,t3
49,t1
50,t6
51,t1
52,t4
53,t6
54,t7
55,t1
56,t1
57,t1
58,t5
59,t6
60,t7
61,t6
62,t4
63,t5
64,t1
65,t3
66,t1
67,t3
68,t3
69,t5
70,t1
71,t6
72,t5
73,t6
74,t1
75,t7
76,t5
77,t3
78,t1
79,t4
80,t3
81,t6
82,t2
83,t6
84,t2
85,t4
86,t7
87,t4
88,t6
89,t5
90,t6
91,t4
92,t3
93,t4
94,t6
95,t2
96,t2
97,t7
98,t4
99,t3
100,t3

<https://gist.github.com/stonegao/087fc0a06fc81177b452a651c16e81c2#output>

But got the following output with duplicate event_ids between windows :

Output

t6	1
t1	2
t7	3
t5	4
t5	4,5
t6	6
t6	6,7
t4	8
t6	9
t7	10
t6	9,11
t5	12
t6	13
t4	14
t4	14,15
t2	16
t7	17
t6	18
t3	19
t7	20
t1	21
t5	22
t5	22,23
t6	24
t6	24,25
t4	26
t4	26,27
t3	28
t2	29
t5	30
t1	31
t1	32
t1	32,33
t1	32,33,34
t2	35
t4	36
t3	37
t3	37,38
t6	39
t6	39,40
t1	41
t4	42
t4	42,43
t6	44
t6	44,45
t7	46
t7	46,47
t3	48
t1	49
t6	50
t1	49,51
t4	52
t6	53
t7	54
t1	55
t1	56
t1	56,57
t5	58
t6	59
t7	60
t6	59,61
t4	62
t5	63
t1	64
t3	65
t1	66
t3	67
t3	67,68
t5	69
t1	70
t6	71
t5	72
t6	73
t1	74
t7	75
t5	76
t3	77
t1	78
t4	79
t3	80
t6	81
t2	82
t6	83
t2	82,84
t4	85
t7	86
t4	87
t6	88
t5	89
t6	90
t4	91
t3	92
t4	93
t6	94
t2	95
t2	96
t7	97
t4	98
t3	99
t3	99,100



Since I am using non-overlapping gap-less windows in kstream processing
dsl, the correct ouput should NOT contain duplicate event ids between
windows.  Any ideas why the duplicates ?   ( Link for the debug project:
https://github.com/westec/ks-aggregate-debug )

Appreciate for your help!

Regards,
EC

Re: Kafka Streams Produced Wrong (duplicated) Results with Simple Windowed Aggregation Case

Posted by EC Boost <ec...@gmail.com>.
Thanks for your help.

For sliding windows the changelog as output behaviour is as expected.
But for non-overlapping windows most of the use cases expect micro-batch
semantics ( no intermediate changelog as output, only final aggregation in
the window).
Any examples for reference to implement micro-batch style window ?

Thanks,
EC

On Tue, Jun 5, 2018 at 2:23 AM, John Roesler <jo...@confluent.io> wrote:

> Hi EC,
>
> Thanks for the very clear report and question. Like Guozhang said this is
> expected (but not ideal) behavior.
>
> For an immediate work-around, you can try materializing the KTable and
> setting the commit interval and cache size as discussed here (
> https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/
> )
> to reduce (but not eliminate) duplicates.
>
> I'm in the process of getting my thoughts in order to write a KIP to
> address this exact use case. If you're interested in participating in the
> discussion, you can keep an eye on the dev mailing list or watch the KIP
> page. I can't say when exactly I'll start it. I want to get it out there
> soon, but I also want to do my homework and have a good proposal.
>
> Thanks,
> -John
>
> On Mon, Jun 4, 2018 at 12:45 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello,
> >
> > Your observation is correct, Kafka Streams by default will print
> continuous
> > updates to each window, instead of waiting for the "final" update for
> each
> > window.
> >
> > There are some ongoing work to provide the functionality to allow users
> > specify sth. like "give me the final result for windowed aggregations" in
> > the DSL, it will probably come post 2.0 release.
> >
> > Guozhang
> >
> >
> > On Mon, Jun 4, 2018 at 8:14 AM, EC Boost <ec...@gmail.com> wrote:
> >
> > > Logged the internal windows information:
> > >
> > > Window{start=1528043030000, end=1528043040000} key=t6  1
> > > Window{start=1528043040000, end=1528043050000} key=t1  2
> > > Window{start=1528043040000, end=1528043050000} key=t7  3
> > > Window{start=1528043040000, end=1528043050000} key=t5  4
> > > Window{start=1528043040000, end=1528043050000} key=t5  4,5
> > > Window{start=1528043050000, end=1528043060000} key=t6  6
> > > Window{start=1528043050000, end=1528043060000} key=t6  6,7
> > > Window{start=1528043050000, end=1528043060000} key=t4  8
> > > Window{start=1528043060000, end=1528043070000} key=t6  9
> > > Window{start=1528043060000, end=1528043070000} key=t7  10
> > > Window{start=1528043060000, end=1528043070000} key=t6  9,11
> > > Window{start=1528043070000, end=1528043080000} key=t5  12
> > > Window{start=1528043070000, end=1528043080000} key=t6  13
> > > Window{start=1528043070000, end=1528043080000} key=t4  14
> > > Window{start=1528043070000, end=1528043080000} key=t4  14,15
> > >
> > > ....
> > >
> > > It seems that Kafka Stream send all the  KTable changelog as output and
> > > that's probably why there's duplicate outputs for gap-less
> > non-overlapping
> > > window.
> > >
> > > Is there any way to achieve real mini-batch-like style processing
> > semantics
> > > using non-overlapping windows which means only the last  value will be
> > sent
> > > as output not all the changelogs in the windows?
> > >
> > >
> > > On Mon, Jun 4, 2018 at 1:25 AM, EC Boost <ec...@gmail.com>
> wrote:
> > >
> > > > Hello Everyone,
> > > >
> > > > I got duplicated results using kstreams for simple  windowed
> > aggregation.
> > > >
> > > > The input event format is comma seperated:  "event_id,event_type"
> and I
> > > > need to aggregate them by event type.
> > > >
> > > > Following is the Kafka Stream processing logic:
> > > >
> > > > events
> > > >       .map((k, v) -> KeyValue.pair(v.split(",")[1], v.split(",")[0]))
> > > >       .groupByKey()
> > > >       .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10)))
> > > >       .aggregate(
> > > >         ArrayList::new,
> > > >         (type, id, eventList) -> {
> > > >           eventList.add(id);
> > > >           return eventList;
> > > >         },
> > > >         Materialized.with(stringSerde, arraySerde)
> > > >       )
> > > >       .toStream((k,v) -> k.key())
> > > >       .mapValues((v)-> String.join(",", v))
> > > >       .to("ks-debug-output", Produced.with(stringSerde,
> stringSerde));
> > > >
> > > >
> > > > I produced the input messages using the following snippet:
> > > >
> > > > require "kafka"
> > > >
> > > > kafka = Kafka.new(["localhost:9092"], client_id: "event-producer")
> > > >
> > > > f = File.open("events.txt")
> > > > f.each_line { |l|
> > > >   puts l
> > > >   kafka.deliver_message("#{l.strip}", topic: "ks-debug-input")
> > > >   sleep(3)
> > > > }
> > > >
> > > >
> > > >
> > > > Messages in events.txt is the following ( format :
> > "event_id,event_type"
> > > > and event_id is unique )  :
> > > >
> > > > Input
> > > >
> > > > 1,t6
> > > > 2,t1
> > > > 3,t7
> > > > 4,t5
> > > > 5,t5
> > > > 6,t6
> > > > 7,t6
> > > > 8,t4
> > > > 9,t6
> > > > 10,t7
> > > > 11,t6
> > > > 12,t5
> > > > 13,t6
> > > > 14,t4
> > > > 15,t4
> > > > 16,t2
> > > > 17,t7
> > > > 18,t6
> > > > 19,t3
> > > > 20,t7
> > > > 21,t1
> > > > 22,t5
> > > > 23,t5
> > > > 24,t6
> > > > 25,t6
> > > > 26,t4
> > > > 27,t4
> > > > 28,t3
> > > > 29,t2
> > > > 30,t5
> > > > 31,t1
> > > > 32,t1
> > > > 33,t1
> > > > 34,t1
> > > > 35,t2
> > > > 36,t4
> > > > 37,t3
> > > > 38,t3
> > > > 39,t6
> > > > 40,t6
> > > > 41,t1
> > > > 42,t4
> > > > 43,t4
> > > > 44,t6
> > > > 45,t6
> > > > 46,t7
> > > > 47,t7
> > > > 48,t3
> > > > 49,t1
> > > > 50,t6
> > > > 51,t1
> > > > 52,t4
> > > > 53,t6
> > > > 54,t7
> > > > 55,t1
> > > > 56,t1
> > > > 57,t1
> > > > 58,t5
> > > > 59,t6
> > > > 60,t7
> > > > 61,t6
> > > > 62,t4
> > > > 63,t5
> > > > 64,t1
> > > > 65,t3
> > > > 66,t1
> > > > 67,t3
> > > > 68,t3
> > > > 69,t5
> > > > 70,t1
> > > > 71,t6
> > > > 72,t5
> > > > 73,t6
> > > > 74,t1
> > > > 75,t7
> > > > 76,t5
> > > > 77,t3
> > > > 78,t1
> > > > 79,t4
> > > > 80,t3
> > > > 81,t6
> > > > 82,t2
> > > > 83,t6
> > > > 84,t2
> > > > 85,t4
> > > > 86,t7
> > > > 87,t4
> > > > 88,t6
> > > > 89,t5
> > > > 90,t6
> > > > 91,t4
> > > > 92,t3
> > > > 93,t4
> > > > 94,t6
> > > > 95,t2
> > > > 96,t2
> > > > 97,t7
> > > > 98,t4
> > > > 99,t3
> > > > 100,t3
> > > >
> > > > <https://gist.github.com/stonegao/087fc0a06fc81177b452a651c16e81
> > > c2#output>
> > > >
> > > > But got the following output with duplicate event_ids between
> windows :
> > > >
> > > > Output
> > > >
> > > > t6    1
> > > > t1    2
> > > > t7    3
> > > > t5    4
> > > > t5    4,5
> > > > t6    6
> > > > t6    6,7
> > > > t4    8
> > > > t6    9
> > > > t7    10
> > > > t6    9,11
> > > > t5    12
> > > > t6    13
> > > > t4    14
> > > > t4    14,15
> > > > t2    16
> > > > t7    17
> > > > t6    18
> > > > t3    19
> > > > t7    20
> > > > t1    21
> > > > t5    22
> > > > t5    22,23
> > > > t6    24
> > > > t6    24,25
> > > > t4    26
> > > > t4    26,27
> > > > t3    28
> > > > t2    29
> > > > t5    30
> > > > t1    31
> > > > t1    32
> > > > t1    32,33
> > > > t1    32,33,34
> > > > t2    35
> > > > t4    36
> > > > t3    37
> > > > t3    37,38
> > > > t6    39
> > > > t6    39,40
> > > > t1    41
> > > > t4    42
> > > > t4    42,43
> > > > t6    44
> > > > t6    44,45
> > > > t7    46
> > > > t7    46,47
> > > > t3    48
> > > > t1    49
> > > > t6    50
> > > > t1    49,51
> > > > t4    52
> > > > t6    53
> > > > t7    54
> > > > t1    55
> > > > t1    56
> > > > t1    56,57
> > > > t5    58
> > > > t6    59
> > > > t7    60
> > > > t6    59,61
> > > > t4    62
> > > > t5    63
> > > > t1    64
> > > > t3    65
> > > > t1    66
> > > > t3    67
> > > > t3    67,68
> > > > t5    69
> > > > t1    70
> > > > t6    71
> > > > t5    72
> > > > t6    73
> > > > t1    74
> > > > t7    75
> > > > t5    76
> > > > t3    77
> > > > t1    78
> > > > t4    79
> > > > t3    80
> > > > t6    81
> > > > t2    82
> > > > t6    83
> > > > t2    82,84
> > > > t4    85
> > > > t7    86
> > > > t4    87
> > > > t6    88
> > > > t5    89
> > > > t6    90
> > > > t4    91
> > > > t3    92
> > > > t4    93
> > > > t6    94
> > > > t2    95
> > > > t2    96
> > > > t7    97
> > > > t4    98
> > > > t3    99
> > > > t3    99,100
> > > >
> > > >
> > > >
> > > > Since I am using non-overlapping gap-less windows in kstream
> processing
> > > > dsl, the correct ouput should NOT contain duplicate event ids between
> > > > windows.  Any ideas why the duplicates ?   ( Link for the debug
> > project:
> > > > https://github.com/westec/ks-aggregate-debug )
> > > >
> > > > Appreciate for your help!
> > > >
> > > > Regards,
> > > > EC
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: Kafka Streams Produced Wrong (duplicated) Results with Simple Windowed Aggregation Case

Posted by John Roesler <jo...@confluent.io>.
Hi EC,

Thanks for the very clear report and question. Like Guozhang said this is
expected (but not ideal) behavior.

For an immediate work-around, you can try materializing the KTable and
setting the commit interval and cache size as discussed here (
https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/)
to reduce (but not eliminate) duplicates.

I'm in the process of getting my thoughts in order to write a KIP to
address this exact use case. If you're interested in participating in the
discussion, you can keep an eye on the dev mailing list or watch the KIP
page. I can't say when exactly I'll start it. I want to get it out there
soon, but I also want to do my homework and have a good proposal.

Thanks,
-John

On Mon, Jun 4, 2018 at 12:45 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Hello,
>
> Your observation is correct, Kafka Streams by default will print continuous
> updates to each window, instead of waiting for the "final" update for each
> window.
>
> There are some ongoing work to provide the functionality to allow users
> specify sth. like "give me the final result for windowed aggregations" in
> the DSL, it will probably come post 2.0 release.
>
> Guozhang
>
>
> On Mon, Jun 4, 2018 at 8:14 AM, EC Boost <ec...@gmail.com> wrote:
>
> > Logged the internal windows information:
> >
> > Window{start=1528043030000, end=1528043040000} key=t6  1
> > Window{start=1528043040000, end=1528043050000} key=t1  2
> > Window{start=1528043040000, end=1528043050000} key=t7  3
> > Window{start=1528043040000, end=1528043050000} key=t5  4
> > Window{start=1528043040000, end=1528043050000} key=t5  4,5
> > Window{start=1528043050000, end=1528043060000} key=t6  6
> > Window{start=1528043050000, end=1528043060000} key=t6  6,7
> > Window{start=1528043050000, end=1528043060000} key=t4  8
> > Window{start=1528043060000, end=1528043070000} key=t6  9
> > Window{start=1528043060000, end=1528043070000} key=t7  10
> > Window{start=1528043060000, end=1528043070000} key=t6  9,11
> > Window{start=1528043070000, end=1528043080000} key=t5  12
> > Window{start=1528043070000, end=1528043080000} key=t6  13
> > Window{start=1528043070000, end=1528043080000} key=t4  14
> > Window{start=1528043070000, end=1528043080000} key=t4  14,15
> >
> > ....
> >
> > It seems that Kafka Stream send all the  KTable changelog as output and
> > that's probably why there's duplicate outputs for gap-less
> non-overlapping
> > window.
> >
> > Is there any way to achieve real mini-batch-like style processing
> semantics
> > using non-overlapping windows which means only the last  value will be
> sent
> > as output not all the changelogs in the windows?
> >
> >
> > On Mon, Jun 4, 2018 at 1:25 AM, EC Boost <ec...@gmail.com> wrote:
> >
> > > Hello Everyone,
> > >
> > > I got duplicated results using kstreams for simple  windowed
> aggregation.
> > >
> > > The input event format is comma seperated:  "event_id,event_type" and I
> > > need to aggregate them by event type.
> > >
> > > Following is the Kafka Stream processing logic:
> > >
> > > events
> > >       .map((k, v) -> KeyValue.pair(v.split(",")[1], v.split(",")[0]))
> > >       .groupByKey()
> > >       .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10)))
> > >       .aggregate(
> > >         ArrayList::new,
> > >         (type, id, eventList) -> {
> > >           eventList.add(id);
> > >           return eventList;
> > >         },
> > >         Materialized.with(stringSerde, arraySerde)
> > >       )
> > >       .toStream((k,v) -> k.key())
> > >       .mapValues((v)-> String.join(",", v))
> > >       .to("ks-debug-output", Produced.with(stringSerde, stringSerde));
> > >
> > >
> > > I produced the input messages using the following snippet:
> > >
> > > require "kafka"
> > >
> > > kafka = Kafka.new(["localhost:9092"], client_id: "event-producer")
> > >
> > > f = File.open("events.txt")
> > > f.each_line { |l|
> > >   puts l
> > >   kafka.deliver_message("#{l.strip}", topic: "ks-debug-input")
> > >   sleep(3)
> > > }
> > >
> > >
> > >
> > > Messages in events.txt is the following ( format :
> "event_id,event_type"
> > > and event_id is unique )  :
> > >
> > > Input
> > >
> > > 1,t6
> > > 2,t1
> > > 3,t7
> > > 4,t5
> > > 5,t5
> > > 6,t6
> > > 7,t6
> > > 8,t4
> > > 9,t6
> > > 10,t7
> > > 11,t6
> > > 12,t5
> > > 13,t6
> > > 14,t4
> > > 15,t4
> > > 16,t2
> > > 17,t7
> > > 18,t6
> > > 19,t3
> > > 20,t7
> > > 21,t1
> > > 22,t5
> > > 23,t5
> > > 24,t6
> > > 25,t6
> > > 26,t4
> > > 27,t4
> > > 28,t3
> > > 29,t2
> > > 30,t5
> > > 31,t1
> > > 32,t1
> > > 33,t1
> > > 34,t1
> > > 35,t2
> > > 36,t4
> > > 37,t3
> > > 38,t3
> > > 39,t6
> > > 40,t6
> > > 41,t1
> > > 42,t4
> > > 43,t4
> > > 44,t6
> > > 45,t6
> > > 46,t7
> > > 47,t7
> > > 48,t3
> > > 49,t1
> > > 50,t6
> > > 51,t1
> > > 52,t4
> > > 53,t6
> > > 54,t7
> > > 55,t1
> > > 56,t1
> > > 57,t1
> > > 58,t5
> > > 59,t6
> > > 60,t7
> > > 61,t6
> > > 62,t4
> > > 63,t5
> > > 64,t1
> > > 65,t3
> > > 66,t1
> > > 67,t3
> > > 68,t3
> > > 69,t5
> > > 70,t1
> > > 71,t6
> > > 72,t5
> > > 73,t6
> > > 74,t1
> > > 75,t7
> > > 76,t5
> > > 77,t3
> > > 78,t1
> > > 79,t4
> > > 80,t3
> > > 81,t6
> > > 82,t2
> > > 83,t6
> > > 84,t2
> > > 85,t4
> > > 86,t7
> > > 87,t4
> > > 88,t6
> > > 89,t5
> > > 90,t6
> > > 91,t4
> > > 92,t3
> > > 93,t4
> > > 94,t6
> > > 95,t2
> > > 96,t2
> > > 97,t7
> > > 98,t4
> > > 99,t3
> > > 100,t3
> > >
> > > <https://gist.github.com/stonegao/087fc0a06fc81177b452a651c16e81
> > c2#output>
> > >
> > > But got the following output with duplicate event_ids between windows :
> > >
> > > Output
> > >
> > > t6    1
> > > t1    2
> > > t7    3
> > > t5    4
> > > t5    4,5
> > > t6    6
> > > t6    6,7
> > > t4    8
> > > t6    9
> > > t7    10
> > > t6    9,11
> > > t5    12
> > > t6    13
> > > t4    14
> > > t4    14,15
> > > t2    16
> > > t7    17
> > > t6    18
> > > t3    19
> > > t7    20
> > > t1    21
> > > t5    22
> > > t5    22,23
> > > t6    24
> > > t6    24,25
> > > t4    26
> > > t4    26,27
> > > t3    28
> > > t2    29
> > > t5    30
> > > t1    31
> > > t1    32
> > > t1    32,33
> > > t1    32,33,34
> > > t2    35
> > > t4    36
> > > t3    37
> > > t3    37,38
> > > t6    39
> > > t6    39,40
> > > t1    41
> > > t4    42
> > > t4    42,43
> > > t6    44
> > > t6    44,45
> > > t7    46
> > > t7    46,47
> > > t3    48
> > > t1    49
> > > t6    50
> > > t1    49,51
> > > t4    52
> > > t6    53
> > > t7    54
> > > t1    55
> > > t1    56
> > > t1    56,57
> > > t5    58
> > > t6    59
> > > t7    60
> > > t6    59,61
> > > t4    62
> > > t5    63
> > > t1    64
> > > t3    65
> > > t1    66
> > > t3    67
> > > t3    67,68
> > > t5    69
> > > t1    70
> > > t6    71
> > > t5    72
> > > t6    73
> > > t1    74
> > > t7    75
> > > t5    76
> > > t3    77
> > > t1    78
> > > t4    79
> > > t3    80
> > > t6    81
> > > t2    82
> > > t6    83
> > > t2    82,84
> > > t4    85
> > > t7    86
> > > t4    87
> > > t6    88
> > > t5    89
> > > t6    90
> > > t4    91
> > > t3    92
> > > t4    93
> > > t6    94
> > > t2    95
> > > t2    96
> > > t7    97
> > > t4    98
> > > t3    99
> > > t3    99,100
> > >
> > >
> > >
> > > Since I am using non-overlapping gap-less windows in kstream processing
> > > dsl, the correct ouput should NOT contain duplicate event ids between
> > > windows.  Any ideas why the duplicates ?   ( Link for the debug
> project:
> > > https://github.com/westec/ks-aggregate-debug )
> > >
> > > Appreciate for your help!
> > >
> > > Regards,
> > > EC
> > >
> > >
> > >
> > >
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Kafka Streams Produced Wrong (duplicated) Results with Simple Windowed Aggregation Case

Posted by Guozhang Wang <wa...@gmail.com>.
Hello,

Your observation is correct, Kafka Streams by default will print continuous
updates to each window, instead of waiting for the "final" update for each
window.

There are some ongoing work to provide the functionality to allow users
specify sth. like "give me the final result for windowed aggregations" in
the DSL, it will probably come post 2.0 release.

Guozhang


On Mon, Jun 4, 2018 at 8:14 AM, EC Boost <ec...@gmail.com> wrote:

> Logged the internal windows information:
>
> Window{start=1528043030000, end=1528043040000} key=t6  1
> Window{start=1528043040000, end=1528043050000} key=t1  2
> Window{start=1528043040000, end=1528043050000} key=t7  3
> Window{start=1528043040000, end=1528043050000} key=t5  4
> Window{start=1528043040000, end=1528043050000} key=t5  4,5
> Window{start=1528043050000, end=1528043060000} key=t6  6
> Window{start=1528043050000, end=1528043060000} key=t6  6,7
> Window{start=1528043050000, end=1528043060000} key=t4  8
> Window{start=1528043060000, end=1528043070000} key=t6  9
> Window{start=1528043060000, end=1528043070000} key=t7  10
> Window{start=1528043060000, end=1528043070000} key=t6  9,11
> Window{start=1528043070000, end=1528043080000} key=t5  12
> Window{start=1528043070000, end=1528043080000} key=t6  13
> Window{start=1528043070000, end=1528043080000} key=t4  14
> Window{start=1528043070000, end=1528043080000} key=t4  14,15
>
> ....
>
> It seems that Kafka Stream send all the  KTable changelog as output and
> that's probably why there's duplicate outputs for gap-less non-overlapping
> window.
>
> Is there any way to achieve real mini-batch-like style processing semantics
> using non-overlapping windows which means only the last  value will be sent
> as output not all the changelogs in the windows?
>
>
> On Mon, Jun 4, 2018 at 1:25 AM, EC Boost <ec...@gmail.com> wrote:
>
> > Hello Everyone,
> >
> > I got duplicated results using kstreams for simple  windowed aggregation.
> >
> > The input event format is comma seperated:  "event_id,event_type" and I
> > need to aggregate them by event type.
> >
> > Following is the Kafka Stream processing logic:
> >
> > events
> >       .map((k, v) -> KeyValue.pair(v.split(",")[1], v.split(",")[0]))
> >       .groupByKey()
> >       .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10)))
> >       .aggregate(
> >         ArrayList::new,
> >         (type, id, eventList) -> {
> >           eventList.add(id);
> >           return eventList;
> >         },
> >         Materialized.with(stringSerde, arraySerde)
> >       )
> >       .toStream((k,v) -> k.key())
> >       .mapValues((v)-> String.join(",", v))
> >       .to("ks-debug-output", Produced.with(stringSerde, stringSerde));
> >
> >
> > I produced the input messages using the following snippet:
> >
> > require "kafka"
> >
> > kafka = Kafka.new(["localhost:9092"], client_id: "event-producer")
> >
> > f = File.open("events.txt")
> > f.each_line { |l|
> >   puts l
> >   kafka.deliver_message("#{l.strip}", topic: "ks-debug-input")
> >   sleep(3)
> > }
> >
> >
> >
> > Messages in events.txt is the following ( format : "event_id,event_type"
> > and event_id is unique )  :
> >
> > Input
> >
> > 1,t6
> > 2,t1
> > 3,t7
> > 4,t5
> > 5,t5
> > 6,t6
> > 7,t6
> > 8,t4
> > 9,t6
> > 10,t7
> > 11,t6
> > 12,t5
> > 13,t6
> > 14,t4
> > 15,t4
> > 16,t2
> > 17,t7
> > 18,t6
> > 19,t3
> > 20,t7
> > 21,t1
> > 22,t5
> > 23,t5
> > 24,t6
> > 25,t6
> > 26,t4
> > 27,t4
> > 28,t3
> > 29,t2
> > 30,t5
> > 31,t1
> > 32,t1
> > 33,t1
> > 34,t1
> > 35,t2
> > 36,t4
> > 37,t3
> > 38,t3
> > 39,t6
> > 40,t6
> > 41,t1
> > 42,t4
> > 43,t4
> > 44,t6
> > 45,t6
> > 46,t7
> > 47,t7
> > 48,t3
> > 49,t1
> > 50,t6
> > 51,t1
> > 52,t4
> > 53,t6
> > 54,t7
> > 55,t1
> > 56,t1
> > 57,t1
> > 58,t5
> > 59,t6
> > 60,t7
> > 61,t6
> > 62,t4
> > 63,t5
> > 64,t1
> > 65,t3
> > 66,t1
> > 67,t3
> > 68,t3
> > 69,t5
> > 70,t1
> > 71,t6
> > 72,t5
> > 73,t6
> > 74,t1
> > 75,t7
> > 76,t5
> > 77,t3
> > 78,t1
> > 79,t4
> > 80,t3
> > 81,t6
> > 82,t2
> > 83,t6
> > 84,t2
> > 85,t4
> > 86,t7
> > 87,t4
> > 88,t6
> > 89,t5
> > 90,t6
> > 91,t4
> > 92,t3
> > 93,t4
> > 94,t6
> > 95,t2
> > 96,t2
> > 97,t7
> > 98,t4
> > 99,t3
> > 100,t3
> >
> > <https://gist.github.com/stonegao/087fc0a06fc81177b452a651c16e81
> c2#output>
> >
> > But got the following output with duplicate event_ids between windows :
> >
> > Output
> >
> > t6    1
> > t1    2
> > t7    3
> > t5    4
> > t5    4,5
> > t6    6
> > t6    6,7
> > t4    8
> > t6    9
> > t7    10
> > t6    9,11
> > t5    12
> > t6    13
> > t4    14
> > t4    14,15
> > t2    16
> > t7    17
> > t6    18
> > t3    19
> > t7    20
> > t1    21
> > t5    22
> > t5    22,23
> > t6    24
> > t6    24,25
> > t4    26
> > t4    26,27
> > t3    28
> > t2    29
> > t5    30
> > t1    31
> > t1    32
> > t1    32,33
> > t1    32,33,34
> > t2    35
> > t4    36
> > t3    37
> > t3    37,38
> > t6    39
> > t6    39,40
> > t1    41
> > t4    42
> > t4    42,43
> > t6    44
> > t6    44,45
> > t7    46
> > t7    46,47
> > t3    48
> > t1    49
> > t6    50
> > t1    49,51
> > t4    52
> > t6    53
> > t7    54
> > t1    55
> > t1    56
> > t1    56,57
> > t5    58
> > t6    59
> > t7    60
> > t6    59,61
> > t4    62
> > t5    63
> > t1    64
> > t3    65
> > t1    66
> > t3    67
> > t3    67,68
> > t5    69
> > t1    70
> > t6    71
> > t5    72
> > t6    73
> > t1    74
> > t7    75
> > t5    76
> > t3    77
> > t1    78
> > t4    79
> > t3    80
> > t6    81
> > t2    82
> > t6    83
> > t2    82,84
> > t4    85
> > t7    86
> > t4    87
> > t6    88
> > t5    89
> > t6    90
> > t4    91
> > t3    92
> > t4    93
> > t6    94
> > t2    95
> > t2    96
> > t7    97
> > t4    98
> > t3    99
> > t3    99,100
> >
> >
> >
> > Since I am using non-overlapping gap-less windows in kstream processing
> > dsl, the correct ouput should NOT contain duplicate event ids between
> > windows.  Any ideas why the duplicates ?   ( Link for the debug project:
> > https://github.com/westec/ks-aggregate-debug )
> >
> > Appreciate for your help!
> >
> > Regards,
> > EC
> >
> >
> >
> >
> >
> >
>



-- 
-- Guozhang

Re: Kafka Streams Produced Wrong (duplicated) Results with Simple Windowed Aggregation Case

Posted by EC Boost <ec...@gmail.com>.
Logged the internal windows information:

Window{start=1528043030000, end=1528043040000} key=t6  1
Window{start=1528043040000, end=1528043050000} key=t1  2
Window{start=1528043040000, end=1528043050000} key=t7  3
Window{start=1528043040000, end=1528043050000} key=t5  4
Window{start=1528043040000, end=1528043050000} key=t5  4,5
Window{start=1528043050000, end=1528043060000} key=t6  6
Window{start=1528043050000, end=1528043060000} key=t6  6,7
Window{start=1528043050000, end=1528043060000} key=t4  8
Window{start=1528043060000, end=1528043070000} key=t6  9
Window{start=1528043060000, end=1528043070000} key=t7  10
Window{start=1528043060000, end=1528043070000} key=t6  9,11
Window{start=1528043070000, end=1528043080000} key=t5  12
Window{start=1528043070000, end=1528043080000} key=t6  13
Window{start=1528043070000, end=1528043080000} key=t4  14
Window{start=1528043070000, end=1528043080000} key=t4  14,15

....

It seems that Kafka Stream send all the  KTable changelog as output and
that's probably why there's duplicate outputs for gap-less non-overlapping
window.

Is there any way to achieve real mini-batch-like style processing semantics
using non-overlapping windows which means only the last  value will be sent
as output not all the changelogs in the windows?


On Mon, Jun 4, 2018 at 1:25 AM, EC Boost <ec...@gmail.com> wrote:

> Hello Everyone,
>
> I got duplicated results using kstreams for simple  windowed aggregation.
>
> The input event format is comma seperated:  "event_id,event_type" and I
> need to aggregate them by event type.
>
> Following is the Kafka Stream processing logic:
>
> events
>       .map((k, v) -> KeyValue.pair(v.split(",")[1], v.split(",")[0]))
>       .groupByKey()
>       .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10)))
>       .aggregate(
>         ArrayList::new,
>         (type, id, eventList) -> {
>           eventList.add(id);
>           return eventList;
>         },
>         Materialized.with(stringSerde, arraySerde)
>       )
>       .toStream((k,v) -> k.key())
>       .mapValues((v)-> String.join(",", v))
>       .to("ks-debug-output", Produced.with(stringSerde, stringSerde));
>
>
> I produced the input messages using the following snippet:
>
> require "kafka"
>
> kafka = Kafka.new(["localhost:9092"], client_id: "event-producer")
>
> f = File.open("events.txt")
> f.each_line { |l|
>   puts l
>   kafka.deliver_message("#{l.strip}", topic: "ks-debug-input")
>   sleep(3)
> }
>
>
>
> Messages in events.txt is the following ( format : "event_id,event_type"
> and event_id is unique )  :
>
> Input
>
> 1,t6
> 2,t1
> 3,t7
> 4,t5
> 5,t5
> 6,t6
> 7,t6
> 8,t4
> 9,t6
> 10,t7
> 11,t6
> 12,t5
> 13,t6
> 14,t4
> 15,t4
> 16,t2
> 17,t7
> 18,t6
> 19,t3
> 20,t7
> 21,t1
> 22,t5
> 23,t5
> 24,t6
> 25,t6
> 26,t4
> 27,t4
> 28,t3
> 29,t2
> 30,t5
> 31,t1
> 32,t1
> 33,t1
> 34,t1
> 35,t2
> 36,t4
> 37,t3
> 38,t3
> 39,t6
> 40,t6
> 41,t1
> 42,t4
> 43,t4
> 44,t6
> 45,t6
> 46,t7
> 47,t7
> 48,t3
> 49,t1
> 50,t6
> 51,t1
> 52,t4
> 53,t6
> 54,t7
> 55,t1
> 56,t1
> 57,t1
> 58,t5
> 59,t6
> 60,t7
> 61,t6
> 62,t4
> 63,t5
> 64,t1
> 65,t3
> 66,t1
> 67,t3
> 68,t3
> 69,t5
> 70,t1
> 71,t6
> 72,t5
> 73,t6
> 74,t1
> 75,t7
> 76,t5
> 77,t3
> 78,t1
> 79,t4
> 80,t3
> 81,t6
> 82,t2
> 83,t6
> 84,t2
> 85,t4
> 86,t7
> 87,t4
> 88,t6
> 89,t5
> 90,t6
> 91,t4
> 92,t3
> 93,t4
> 94,t6
> 95,t2
> 96,t2
> 97,t7
> 98,t4
> 99,t3
> 100,t3
>
> <https://gist.github.com/stonegao/087fc0a06fc81177b452a651c16e81c2#output>
>
> But got the following output with duplicate event_ids between windows :
>
> Output
>
> t6	1
> t1	2
> t7	3
> t5	4
> t5	4,5
> t6	6
> t6	6,7
> t4	8
> t6	9
> t7	10
> t6	9,11
> t5	12
> t6	13
> t4	14
> t4	14,15
> t2	16
> t7	17
> t6	18
> t3	19
> t7	20
> t1	21
> t5	22
> t5	22,23
> t6	24
> t6	24,25
> t4	26
> t4	26,27
> t3	28
> t2	29
> t5	30
> t1	31
> t1	32
> t1	32,33
> t1	32,33,34
> t2	35
> t4	36
> t3	37
> t3	37,38
> t6	39
> t6	39,40
> t1	41
> t4	42
> t4	42,43
> t6	44
> t6	44,45
> t7	46
> t7	46,47
> t3	48
> t1	49
> t6	50
> t1	49,51
> t4	52
> t6	53
> t7	54
> t1	55
> t1	56
> t1	56,57
> t5	58
> t6	59
> t7	60
> t6	59,61
> t4	62
> t5	63
> t1	64
> t3	65
> t1	66
> t3	67
> t3	67,68
> t5	69
> t1	70
> t6	71
> t5	72
> t6	73
> t1	74
> t7	75
> t5	76
> t3	77
> t1	78
> t4	79
> t3	80
> t6	81
> t2	82
> t6	83
> t2	82,84
> t4	85
> t7	86
> t4	87
> t6	88
> t5	89
> t6	90
> t4	91
> t3	92
> t4	93
> t6	94
> t2	95
> t2	96
> t7	97
> t4	98
> t3	99
> t3	99,100
>
>
>
> Since I am using non-overlapping gap-less windows in kstream processing
> dsl, the correct ouput should NOT contain duplicate event ids between
> windows.  Any ideas why the duplicates ?   ( Link for the debug project:
> https://github.com/westec/ks-aggregate-debug )
>
> Appreciate for your help!
>
> Regards,
> EC
>
>
>
>
>
>