You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Kiran K <em...@gmail.com> on 2019/11/22 11:50:16 UTC

Re: Suppress DSL operator in stream api - 2.4.0

Hi Matthias,

when we say 2.4.0, we just built the kafka from source and marked it as 2.4.0 for our internal reference.

But I have taken latest Kafka 2.3.1 and tested & the issue still persists.

pseudo code:
.filter((key, value) -> ...)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(1)))
.aggregate(.......)
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))

As per this, if we are using the event time and below is the ingestion of events:

msg1 @ 11:00
msg2 @ 11:01
msg3 @ 11:02
msg4 @ 11:03

Then, I should see the result of msg1 emitted when msg3 is ingested into topic? Worst case I should see the result of msg1 once I ingest msg4?  I don't see anything getting emitted...I tried even after ingesting a record after 10 mins.. even then nothing gets emitted.

Thanks,
Kiran

On 2019/10/24 05:07:02, "Matthias J. Sax" <ma...@confluent.io> wrote: 
> Did you try to test you code using `TopologyTestDriver`? Maybe this
> helps to figure out the root cause of the issue.
> 
> We have many unit/integration tests in place and many people use
> suppress() successfully in production. Hence, I am sure, it basically
> works -- of course, they might still be an unknown bug...
> 
> One side question: the subject of this email thread says "2.4.0", but
> Kafka 2.4.0 is not release yet. Hence, I am wondering what version you
> are actually using?
> 
> 
> 
> -Matthias
> 
> On 9/25/19 4:53 PM, Thameem Ansari wrote:
> > Tried your suggestions and unable to get suppress emit anything. I can see the SUPPRESS_STORES are created in Kafka nodes but nothing get outputted. 
> > Looks like the grace period and window closing is not honored for some reason. I can see lot of people having difficulty of getting suppress working. 
> > My window time is one minute and I tried with and without grace period. I can see the event time is in the past as I am feeding the test data but even if I post the data with progressive event times in an order nothing happens. 
> > Any help is appreciated. 
> > 
> > Thanks. 
> > 
> >> On Sep 11, 2019, at 10:50 PM, Alessandro Tagliapietra <ta...@gmail.com> wrote:
> >>
> >> Have you tried deleting the suppress changelog topic to see if you get
> >> something after deleting it?
> >>
> >> By per topic and not per key I mean that if you send for example an event
> >> with timestamp equal to today's date with key 1 and that closes today's
> >> window and data in the past with key 2 won't go through
> >>
> >> On Wed, Sep 11, 2019, 8:45 PM Thameem Ansari <th...@gmail.com> wrote:
> >>
> >>> I tried with different timestamps in the near past but nothing coming out.
> >>> I went thru the article from Confluent about using the suppress but I don’t
> >>> see many people are successful with that.
> >>>
> >>> What do you mean by “timestamp is per topic and not per key”. Can you
> >>> please elaborate?
> >>>
> >>>
> >>>
> >>>
> >>>> On Sep 11, 2019, at 10:13 PM, Alessandro Tagliapietra <
> >>> tagliapietra.alessandro@gmail.com> wrote:
> >>>>
> >>>> Did you ever push any data with a greater timestamp than the current one
> >>>> you're producing?
> >>>> One thing took me a while to find out is that the suppress timestamp is
> >>> per
> >>>> topic and not per key
> >>>>
> >>>> --
> >>>> Alessandro Tagliapietra
> >>>>
> >>>>
> >>>> On Wed, Sep 11, 2019 at 8:06 PM Thameem Ansari <th...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> Yes I am able to see the output when I remove suppress.
> >>>>>
> >>>>>
> >>>>>> On Sep 11, 2019, at 9:58 PM, Matthias J. Sax <ma...@confluent.io>
> >>>>> wrote:
> >>>>>>
> >>>>>> Hard to say. Do you see output if you remove `suppress()` from your
> >>>>>> topology?
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>> On 9/11/19 6:19 PM, Thameem Ansari wrote:
> >>>>>>> I am using a producer simulator to simulate the events in the past and
> >>>>> I can see my time advances and the topology is based on the event time.
> >>> But
> >>>>> even if I run the producer for few hours nothing get emitted. Is there
> >>>>> anyway to debug this issue?
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>> On Sep 11, 2019, at 6:13 PM, Matthias J. Sax <ma...@confluent.io>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>> Note that `suppress()` is event time based, and does not emit any
> >>> data
> >>>>>>>> if event time does not advance.
> >>>>>>>>
> >>>>>>>> A common miss understanding is, that people stop to send data and
> >>>>> expect
> >>>>>>>> to see a result after some time, but that is not how it works. If you
> >>>>>>>> stop sending data, event time cannot advance and thus emit will never
> >>>>>>>> send anything downstream.
> >>>>>>>>
> >>>>>>>> Also see this blog post about `suppress`:
> >>>>>>>>
> >>>>>
> >>> https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 9/10/19 9:52 PM, Thameem Ansari wrote:
> >>>>>>>>> In my streaming topology, I am using the suppress dsl operator. As
> >>>>> per the documentation, it is supposed to output the final results after
> >>> the
> >>>>> window closes. But I noticed it's not emitting anything at all. Here is
> >>> the
> >>>>> pseudo code of my topology.
> >>>>>>>>>
> >>>>>>>>> .filter((key, value) -> ...)
> >>>>>>>>> .flatMap((key, value) -> {
> >>>>>>>>> ...
> >>>>>>>>> })
> >>>>>>>>> .groupByKey(Grouped.with(Serdes.String(), ...))
> >>>>>>>>>
> >>>>>
> >>> .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(1)))
> >>>>>>>>> .aggregate(
> >>>>>>>>>   ...
> >>>>>>>>>
> >>>>>
> >>> ).suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
> >>>>>
> >>>>>>>>> Anything wrong here??
> >>>>>>>>>
> >>>>>>>>> Thanks
> >>>>>>>>> Thameem
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>
> >>>
> > 
> 
> 

Re: Suppress DSL operator in stream api - 2.4.0

Posted by "Matthias J. Sax" <ma...@confluent.io>.
What you say makes sense to me. I am not aware of any known bugs, hence,
it seems you are hitting some unknown bug.

Can you reproduce it reliably? Maybe you can share the code and input
data set so we can have a look? Best would be, if we could reproduce it
via TopologyTestDriver.


-Matthias

On 11/22/19 3:50 AM, Kiran K wrote:
> Hi Matthias,
> 
> when we say 2.4.0, we just built the kafka from source and marked it as 2.4.0 for our internal reference.
> 
> But I have taken latest Kafka 2.3.1 and tested & the issue still persists.
> 
> pseudo code:
> .filter((key, value) -> ...)
> .groupByKey()
> .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(1)))
> .aggregate(.......)
> .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
> 
> As per this, if we are using the event time and below is the ingestion of events:
> 
> msg1 @ 11:00
> msg2 @ 11:01
> msg3 @ 11:02
> msg4 @ 11:03
> 
> Then, I should see the result of msg1 emitted when msg3 is ingested into topic? Worst case I should see the result of msg1 once I ingest msg4?  I don't see anything getting emitted...I tried even after ingesting a record after 10 mins.. even then nothing gets emitted.
> 
> Thanks,
> Kiran
> 
> On 2019/10/24 05:07:02, "Matthias J. Sax" <ma...@confluent.io> wrote: 
>> Did you try to test you code using `TopologyTestDriver`? Maybe this
>> helps to figure out the root cause of the issue.
>>
>> We have many unit/integration tests in place and many people use
>> suppress() successfully in production. Hence, I am sure, it basically
>> works -- of course, they might still be an unknown bug...
>>
>> One side question: the subject of this email thread says "2.4.0", but
>> Kafka 2.4.0 is not release yet. Hence, I am wondering what version you
>> are actually using?
>>
>>
>>
>> -Matthias
>>
>> On 9/25/19 4:53 PM, Thameem Ansari wrote:
>>> Tried your suggestions and unable to get suppress emit anything. I can see the SUPPRESS_STORES are created in Kafka nodes but nothing get outputted. 
>>> Looks like the grace period and window closing is not honored for some reason. I can see lot of people having difficulty of getting suppress working. 
>>> My window time is one minute and I tried with and without grace period. I can see the event time is in the past as I am feeding the test data but even if I post the data with progressive event times in an order nothing happens. 
>>> Any help is appreciated. 
>>>
>>> Thanks. 
>>>
>>>> On Sep 11, 2019, at 10:50 PM, Alessandro Tagliapietra <ta...@gmail.com> wrote:
>>>>
>>>> Have you tried deleting the suppress changelog topic to see if you get
>>>> something after deleting it?
>>>>
>>>> By per topic and not per key I mean that if you send for example an event
>>>> with timestamp equal to today's date with key 1 and that closes today's
>>>> window and data in the past with key 2 won't go through
>>>>
>>>> On Wed, Sep 11, 2019, 8:45 PM Thameem Ansari <th...@gmail.com> wrote:
>>>>
>>>>> I tried with different timestamps in the near past but nothing coming out.
>>>>> I went thru the article from Confluent about using the suppress but I don’t
>>>>> see many people are successful with that.
>>>>>
>>>>> What do you mean by “timestamp is per topic and not per key”. Can you
>>>>> please elaborate?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>> On Sep 11, 2019, at 10:13 PM, Alessandro Tagliapietra <
>>>>> tagliapietra.alessandro@gmail.com> wrote:
>>>>>>
>>>>>> Did you ever push any data with a greater timestamp than the current one
>>>>>> you're producing?
>>>>>> One thing took me a while to find out is that the suppress timestamp is
>>>>> per
>>>>>> topic and not per key
>>>>>>
>>>>>> --
>>>>>> Alessandro Tagliapietra
>>>>>>
>>>>>>
>>>>>> On Wed, Sep 11, 2019 at 8:06 PM Thameem Ansari <th...@gmail.com>
>>>>> wrote:
>>>>>>
>>>>>>> Yes I am able to see the output when I remove suppress.
>>>>>>>
>>>>>>>
>>>>>>>> On Sep 11, 2019, at 9:58 PM, Matthias J. Sax <ma...@confluent.io>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hard to say. Do you see output if you remove `suppress()` from your
>>>>>>>> topology?
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>> On 9/11/19 6:19 PM, Thameem Ansari wrote:
>>>>>>>>> I am using a producer simulator to simulate the events in the past and
>>>>>>> I can see my time advances and the topology is based on the event time.
>>>>> But
>>>>>>> even if I run the producer for few hours nothing get emitted. Is there
>>>>>>> anyway to debug this issue?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> On Sep 11, 2019, at 6:13 PM, Matthias J. Sax <ma...@confluent.io>
>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Note that `suppress()` is event time based, and does not emit any
>>>>> data
>>>>>>>>>> if event time does not advance.
>>>>>>>>>>
>>>>>>>>>> A common miss understanding is, that people stop to send data and
>>>>>>> expect
>>>>>>>>>> to see a result after some time, but that is not how it works. If you
>>>>>>>>>> stop sending data, event time cannot advance and thus emit will never
>>>>>>>>>> send anything downstream.
>>>>>>>>>>
>>>>>>>>>> Also see this blog post about `suppress`:
>>>>>>>>>>
>>>>>>>
>>>>> https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 9/10/19 9:52 PM, Thameem Ansari wrote:
>>>>>>>>>>> In my streaming topology, I am using the suppress dsl operator. As
>>>>>>> per the documentation, it is supposed to output the final results after
>>>>> the
>>>>>>> window closes. But I noticed it's not emitting anything at all. Here is
>>>>> the
>>>>>>> pseudo code of my topology.
>>>>>>>>>>>
>>>>>>>>>>> .filter((key, value) -> ...)
>>>>>>>>>>> .flatMap((key, value) -> {
>>>>>>>>>>> ...
>>>>>>>>>>> })
>>>>>>>>>>> .groupByKey(Grouped.with(Serdes.String(), ...))
>>>>>>>>>>>
>>>>>>>
>>>>> .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(1)))
>>>>>>>>>>> .aggregate(
>>>>>>>>>>>   ...
>>>>>>>>>>>
>>>>>>>
>>>>> ).suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
>>>>>>>
>>>>>>>>>>> Anything wrong here??
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Thameem
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>
>>
>>