You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Caleb Welton <ca...@autonomic.ai> on 2017/06/14 21:53:29 UTC

Dropped messages in kstreams?

I have a topology of
    KStream -> KTable -> KStream

```

final KStreamBuilder builder = new KStreamBuilder();
final KStream<String, Metric> metricStream = builder.stream(ingestTopic);
final KTable<Windowed<String>, MyThing> myTable = metricStream
        .groupByKey(stringSerde, mySerde)
        .reduce(MyThing::merge,

TimeWindows.of(10000).advanceBy(10000).until(Duration.ofDays(retentionDays).toMillis()),
                tableTopic);

myTable.toStream()
        .map((key, value) -> { return (KeyValue.pair(key.key(),
value.finalize(key.window()))); })
        .to(stringSerde, mySerde, sinkTopic);

```


Normally went sent data at 10x a second I expect ~1 output metric for every
100 metrics it receives, based on the 10 second window width.

When fed data real time at that rate it seems to do just that.

However when I either reprocess on an input topic with a large amount of
data or feed data in significantly faster I see a very different behavior.

Over the course of 20 seconds I can see 1,440,000 messages being ingested
into the ktable, but only 633 emitted from it (expected 14400).

Over the next minute the records output creeps to 1796, but then holds
steady and does not keep going up to the expected total of 14400.

A consumer reading from the sinkTopic ends up finding about 1264, which is
lower than the 1796 records I would have anticipated from the number of
calls into the final kstream map function.

Precise number of emitted records will vary from one run to the next.

Where are the extra metrics going?  Is there some commit issue that is
causing dropped messages if the ktable producer isn't able to keep up?

Any recommendations on where to focus the investigation of the issue?

Running Kafka 0.10.2.1.

Thanks,
  Caleb

Re: Dropped messages in kstreams?

Posted by Caleb Welton <ca...@autonomic.ai>.
Short answer seems to be that my Kafka LogRetentionTime was such that the
metrics I was writing were getting purged from kafka during the test.
Dropped metrics.

On Thu, Jun 15, 2017 at 1:32 PM, Caleb Welton <ca...@autonomic.ai> wrote:

> I have encapsulated the repro into a small self contained project:
> https://github.com/cwelton/kstreams-repro
>
> Thanks,
>   Caleb
>
>
> On Thu, Jun 15, 2017 at 11:30 AM, Caleb Welton <ca...@autonomic.ai> wrote:
>
>> I do have a TimestampExtractor setup and for the 10 second windows that
>> are emitted all the values expected in those windows are present, e.g. each
>> 10 second window gets 100 values aggregated into it.
>>
>> I have no metrics with null keys or values.
>>
>> I will try to get the entire reproduction case packaged up in a way that
>> I can more easily share.
>>
>>
>> On Thu, Jun 15, 2017 at 11:18 AM, Matthias J. Sax <ma...@confluent.io>
>> wrote:
>>
>>> Another thing to consider? Do you have records will null key or value?
>>> Those records would be dropped and not processes.
>>>
>>> -Matthias
>>>
>>> On 6/15/17 6:24 AM, Garrett Barton wrote:
>>> > Is your time usage correct?  It sounds like you want event time not
>>> > load/process time which is default unless you have a TimestampExtractor
>>> > defined somewhere upstream?  Otherwise I could see far fewer events
>>> coming
>>> > out as streams is just aggregating whatever showed up in that 10 second
>>> > window.
>>> >
>>> > On Wed, Jun 14, 2017 at 8:43 PM, Caleb Welton <ca...@autonomic.ai>
>>> wrote:
>>> >
>>> >> Disabling the cache with:
>>> >>
>>> >> ```
>>> >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFE
>>> RING_CONFIG,
>>> >> 0)
>>> >> ```
>>> >>
>>> >> Results in:
>>> >> - Emitting many more intermediate calculations.
>>> >> - Still losing data.
>>> >>
>>> >> In my test case it output 342476 intermediate calculations for 3414
>>> >> distinct windows, 14400 distinct were expected.
>>> >>
>>> >> Regards,
>>> >>   Caleb
>>> >>
>>> >> On Wed, Jun 14, 2017 at 5:13 PM, Matthias J. Sax <
>>> matthias@confluent.io>
>>> >> wrote:
>>> >>
>>> >>> This seems to be related to internal KTable caches. You can disable
>>> them
>>> >>> by setting cache size to zero.
>>> >>>
>>> >>> http://docs.confluent.io/current/streams/developer-
>>> >>> guide.html#memory-management
>>> >>>
>>> >>> -Matthias
>>> >>>
>>> >>>
>>> >>>
>>> >>> On 6/14/17 4:08 PM, Caleb Welton wrote:
>>> >>>> Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then
>>> the
>>> >>>> problem does not manifest, at `StreamsConfig.NUM_STREAM_
>>> >>> THREADS_CONFIG=2`
>>> >>>> or higher the problem shows up.
>>> >>>>
>>> >>>> When the number of threads is 1 the speed of data through the first
>>> >> part
>>> >>> of
>>> >>>> the topology (before the ktable) slows down considerably, but it
>>> seems
>>> >> to
>>> >>>> slow down to the speed of the output which may be the key.
>>> >>>>
>>> >>>> That said... Changing the number of stream threads should not impact
>>> >> data
>>> >>>> correctness.  Seems like a bug someplace in kafka.
>>> >>>>
>>> >>>>
>>> >>>>
>>> >>>> On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai>
>>> >>> wrote:
>>> >>>>
>>> >>>>> I have a topology of
>>> >>>>>     KStream -> KTable -> KStream
>>> >>>>>
>>> >>>>> ```
>>> >>>>>
>>> >>>>> final KStreamBuilder builder = new KStreamBuilder();
>>> >>>>> final KStream<String, Metric> metricStream =
>>> >>> builder.stream(ingestTopic);
>>> >>>>> final KTable<Windowed<String>, MyThing> myTable = metricStream
>>> >>>>>         .groupByKey(stringSerde, mySerde)
>>> >>>>>         .reduce(MyThing::merge,
>>> >>>>>                 TimeWindows.of(10000).advanceBy(10000).until(
>>> >>> Duration.ofDays(retentionDays).toMillis()),
>>> >>>>>                 tableTopic);
>>> >>>>>
>>> >>>>> myTable.toStream()
>>> >>>>>         .map((key, value) -> { return (KeyValue.pair(key.key(),
>>> >>> value.finalize(key.window()))); })
>>> >>>>>         .to(stringSerde, mySerde, sinkTopic);
>>> >>>>>
>>> >>>>> ```
>>> >>>>>
>>> >>>>>
>>> >>>>> Normally went sent data at 10x a second I expect ~1 output metric
>>> for
>>> >>>>> every 100 metrics it receives, based on the 10 second window width.
>>> >>>>>
>>> >>>>> When fed data real time at that rate it seems to do just that.
>>> >>>>>
>>> >>>>> However when I either reprocess on an input topic with a large
>>> amount
>>> >> of
>>> >>>>> data or feed data in significantly faster I see a very different
>>> >>> behavior.
>>> >>>>>
>>> >>>>> Over the course of 20 seconds I can see 1,440,000 messages being
>>> >>> ingested
>>> >>>>> into the ktable, but only 633 emitted from it (expected 14400).
>>> >>>>>
>>> >>>>> Over the next minute the records output creeps to 1796, but then
>>> holds
>>> >>>>> steady and does not keep going up to the expected total of 14400.
>>> >>>>>
>>> >>>>> A consumer reading from the sinkTopic ends up finding about 1264,
>>> >> which
>>> >>> is
>>> >>>>> lower than the 1796 records I would have anticipated from the
>>> number
>>> >> of
>>> >>>>> calls into the final kstream map function.
>>> >>>>>
>>> >>>>> Precise number of emitted records will vary from one run to the
>>> next.
>>> >>>>>
>>> >>>>> Where are the extra metrics going?  Is there some commit issue
>>> that is
>>> >>>>> causing dropped messages if the ktable producer isn't able to keep
>>> up?
>>> >>>>>
>>> >>>>> Any recommendations on where to focus the investigation of the
>>> issue?
>>> >>>>>
>>> >>>>> Running Kafka 0.10.2.1.
>>> >>>>>
>>> >>>>> Thanks,
>>> >>>>>   Caleb
>>> >>>>>
>>> >>>>
>>> >>>
>>> >>>
>>> >>
>>> >
>>>
>>>
>>
>

Re: Dropped messages in kstreams?

Posted by Caleb Welton <ca...@autonomic.ai>.
I have encapsulated the repro into a small self contained project:
https://github.com/cwelton/kstreams-repro

Thanks,
  Caleb


On Thu, Jun 15, 2017 at 11:30 AM, Caleb Welton <ca...@autonomic.ai> wrote:

> I do have a TimestampExtractor setup and for the 10 second windows that
> are emitted all the values expected in those windows are present, e.g. each
> 10 second window gets 100 values aggregated into it.
>
> I have no metrics with null keys or values.
>
> I will try to get the entire reproduction case packaged up in a way that I
> can more easily share.
>
>
> On Thu, Jun 15, 2017 at 11:18 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
>> Another thing to consider? Do you have records will null key or value?
>> Those records would be dropped and not processes.
>>
>> -Matthias
>>
>> On 6/15/17 6:24 AM, Garrett Barton wrote:
>> > Is your time usage correct?  It sounds like you want event time not
>> > load/process time which is default unless you have a TimestampExtractor
>> > defined somewhere upstream?  Otherwise I could see far fewer events
>> coming
>> > out as streams is just aggregating whatever showed up in that 10 second
>> > window.
>> >
>> > On Wed, Jun 14, 2017 at 8:43 PM, Caleb Welton <ca...@autonomic.ai>
>> wrote:
>> >
>> >> Disabling the cache with:
>> >>
>> >> ```
>> >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFE
>> RING_CONFIG,
>> >> 0)
>> >> ```
>> >>
>> >> Results in:
>> >> - Emitting many more intermediate calculations.
>> >> - Still losing data.
>> >>
>> >> In my test case it output 342476 intermediate calculations for 3414
>> >> distinct windows, 14400 distinct were expected.
>> >>
>> >> Regards,
>> >>   Caleb
>> >>
>> >> On Wed, Jun 14, 2017 at 5:13 PM, Matthias J. Sax <
>> matthias@confluent.io>
>> >> wrote:
>> >>
>> >>> This seems to be related to internal KTable caches. You can disable
>> them
>> >>> by setting cache size to zero.
>> >>>
>> >>> http://docs.confluent.io/current/streams/developer-
>> >>> guide.html#memory-management
>> >>>
>> >>> -Matthias
>> >>>
>> >>>
>> >>>
>> >>> On 6/14/17 4:08 PM, Caleb Welton wrote:
>> >>>> Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then
>> the
>> >>>> problem does not manifest, at `StreamsConfig.NUM_STREAM_
>> >>> THREADS_CONFIG=2`
>> >>>> or higher the problem shows up.
>> >>>>
>> >>>> When the number of threads is 1 the speed of data through the first
>> >> part
>> >>> of
>> >>>> the topology (before the ktable) slows down considerably, but it
>> seems
>> >> to
>> >>>> slow down to the speed of the output which may be the key.
>> >>>>
>> >>>> That said... Changing the number of stream threads should not impact
>> >> data
>> >>>> correctness.  Seems like a bug someplace in kafka.
>> >>>>
>> >>>>
>> >>>>
>> >>>> On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai>
>> >>> wrote:
>> >>>>
>> >>>>> I have a topology of
>> >>>>>     KStream -> KTable -> KStream
>> >>>>>
>> >>>>> ```
>> >>>>>
>> >>>>> final KStreamBuilder builder = new KStreamBuilder();
>> >>>>> final KStream<String, Metric> metricStream =
>> >>> builder.stream(ingestTopic);
>> >>>>> final KTable<Windowed<String>, MyThing> myTable = metricStream
>> >>>>>         .groupByKey(stringSerde, mySerde)
>> >>>>>         .reduce(MyThing::merge,
>> >>>>>                 TimeWindows.of(10000).advanceBy(10000).until(
>> >>> Duration.ofDays(retentionDays).toMillis()),
>> >>>>>                 tableTopic);
>> >>>>>
>> >>>>> myTable.toStream()
>> >>>>>         .map((key, value) -> { return (KeyValue.pair(key.key(),
>> >>> value.finalize(key.window()))); })
>> >>>>>         .to(stringSerde, mySerde, sinkTopic);
>> >>>>>
>> >>>>> ```
>> >>>>>
>> >>>>>
>> >>>>> Normally went sent data at 10x a second I expect ~1 output metric
>> for
>> >>>>> every 100 metrics it receives, based on the 10 second window width.
>> >>>>>
>> >>>>> When fed data real time at that rate it seems to do just that.
>> >>>>>
>> >>>>> However when I either reprocess on an input topic with a large
>> amount
>> >> of
>> >>>>> data or feed data in significantly faster I see a very different
>> >>> behavior.
>> >>>>>
>> >>>>> Over the course of 20 seconds I can see 1,440,000 messages being
>> >>> ingested
>> >>>>> into the ktable, but only 633 emitted from it (expected 14400).
>> >>>>>
>> >>>>> Over the next minute the records output creeps to 1796, but then
>> holds
>> >>>>> steady and does not keep going up to the expected total of 14400.
>> >>>>>
>> >>>>> A consumer reading from the sinkTopic ends up finding about 1264,
>> >> which
>> >>> is
>> >>>>> lower than the 1796 records I would have anticipated from the number
>> >> of
>> >>>>> calls into the final kstream map function.
>> >>>>>
>> >>>>> Precise number of emitted records will vary from one run to the
>> next.
>> >>>>>
>> >>>>> Where are the extra metrics going?  Is there some commit issue that
>> is
>> >>>>> causing dropped messages if the ktable producer isn't able to keep
>> up?
>> >>>>>
>> >>>>> Any recommendations on where to focus the investigation of the
>> issue?
>> >>>>>
>> >>>>> Running Kafka 0.10.2.1.
>> >>>>>
>> >>>>> Thanks,
>> >>>>>   Caleb
>> >>>>>
>> >>>>
>> >>>
>> >>>
>> >>
>> >
>>
>>
>

Re: Dropped messages in kstreams?

Posted by Caleb Welton <ca...@autonomic.ai>.
I do have a TimestampExtractor setup and for the 10 second windows that are
emitted all the values expected in those windows are present, e.g. each 10
second window gets 100 values aggregated into it.

I have no metrics with null keys or values.

I will try to get the entire reproduction case packaged up in a way that I
can more easily share.


On Thu, Jun 15, 2017 at 11:18 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Another thing to consider? Do you have records will null key or value?
> Those records would be dropped and not processes.
>
> -Matthias
>
> On 6/15/17 6:24 AM, Garrett Barton wrote:
> > Is your time usage correct?  It sounds like you want event time not
> > load/process time which is default unless you have a TimestampExtractor
> > defined somewhere upstream?  Otherwise I could see far fewer events
> coming
> > out as streams is just aggregating whatever showed up in that 10 second
> > window.
> >
> > On Wed, Jun 14, 2017 at 8:43 PM, Caleb Welton <ca...@autonomic.ai>
> wrote:
> >
> >> Disabling the cache with:
> >>
> >> ```
> >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_
> BUFFERING_CONFIG,
> >> 0)
> >> ```
> >>
> >> Results in:
> >> - Emitting many more intermediate calculations.
> >> - Still losing data.
> >>
> >> In my test case it output 342476 intermediate calculations for 3414
> >> distinct windows, 14400 distinct were expected.
> >>
> >> Regards,
> >>   Caleb
> >>
> >> On Wed, Jun 14, 2017 at 5:13 PM, Matthias J. Sax <matthias@confluent.io
> >
> >> wrote:
> >>
> >>> This seems to be related to internal KTable caches. You can disable
> them
> >>> by setting cache size to zero.
> >>>
> >>> http://docs.confluent.io/current/streams/developer-
> >>> guide.html#memory-management
> >>>
> >>> -Matthias
> >>>
> >>>
> >>>
> >>> On 6/14/17 4:08 PM, Caleb Welton wrote:
> >>>> Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then the
> >>>> problem does not manifest, at `StreamsConfig.NUM_STREAM_
> >>> THREADS_CONFIG=2`
> >>>> or higher the problem shows up.
> >>>>
> >>>> When the number of threads is 1 the speed of data through the first
> >> part
> >>> of
> >>>> the topology (before the ktable) slows down considerably, but it seems
> >> to
> >>>> slow down to the speed of the output which may be the key.
> >>>>
> >>>> That said... Changing the number of stream threads should not impact
> >> data
> >>>> correctness.  Seems like a bug someplace in kafka.
> >>>>
> >>>>
> >>>>
> >>>> On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai>
> >>> wrote:
> >>>>
> >>>>> I have a topology of
> >>>>>     KStream -> KTable -> KStream
> >>>>>
> >>>>> ```
> >>>>>
> >>>>> final KStreamBuilder builder = new KStreamBuilder();
> >>>>> final KStream<String, Metric> metricStream =
> >>> builder.stream(ingestTopic);
> >>>>> final KTable<Windowed<String>, MyThing> myTable = metricStream
> >>>>>         .groupByKey(stringSerde, mySerde)
> >>>>>         .reduce(MyThing::merge,
> >>>>>                 TimeWindows.of(10000).advanceBy(10000).until(
> >>> Duration.ofDays(retentionDays).toMillis()),
> >>>>>                 tableTopic);
> >>>>>
> >>>>> myTable.toStream()
> >>>>>         .map((key, value) -> { return (KeyValue.pair(key.key(),
> >>> value.finalize(key.window()))); })
> >>>>>         .to(stringSerde, mySerde, sinkTopic);
> >>>>>
> >>>>> ```
> >>>>>
> >>>>>
> >>>>> Normally went sent data at 10x a second I expect ~1 output metric for
> >>>>> every 100 metrics it receives, based on the 10 second window width.
> >>>>>
> >>>>> When fed data real time at that rate it seems to do just that.
> >>>>>
> >>>>> However when I either reprocess on an input topic with a large amount
> >> of
> >>>>> data or feed data in significantly faster I see a very different
> >>> behavior.
> >>>>>
> >>>>> Over the course of 20 seconds I can see 1,440,000 messages being
> >>> ingested
> >>>>> into the ktable, but only 633 emitted from it (expected 14400).
> >>>>>
> >>>>> Over the next minute the records output creeps to 1796, but then
> holds
> >>>>> steady and does not keep going up to the expected total of 14400.
> >>>>>
> >>>>> A consumer reading from the sinkTopic ends up finding about 1264,
> >> which
> >>> is
> >>>>> lower than the 1796 records I would have anticipated from the number
> >> of
> >>>>> calls into the final kstream map function.
> >>>>>
> >>>>> Precise number of emitted records will vary from one run to the next.
> >>>>>
> >>>>> Where are the extra metrics going?  Is there some commit issue that
> is
> >>>>> causing dropped messages if the ktable producer isn't able to keep
> up?
> >>>>>
> >>>>> Any recommendations on where to focus the investigation of the issue?
> >>>>>
> >>>>> Running Kafka 0.10.2.1.
> >>>>>
> >>>>> Thanks,
> >>>>>   Caleb
> >>>>>
> >>>>
> >>>
> >>>
> >>
> >
>
>

Re: Dropped messages in kstreams?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Another thing to consider? Do you have records will null key or value?
Those records would be dropped and not processes.

-Matthias

On 6/15/17 6:24 AM, Garrett Barton wrote:
> Is your time usage correct?  It sounds like you want event time not
> load/process time which is default unless you have a TimestampExtractor
> defined somewhere upstream?  Otherwise I could see far fewer events coming
> out as streams is just aggregating whatever showed up in that 10 second
> window.
> 
> On Wed, Jun 14, 2017 at 8:43 PM, Caleb Welton <ca...@autonomic.ai> wrote:
> 
>> Disabling the cache with:
>>
>> ```
>> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
>> 0)
>> ```
>>
>> Results in:
>> - Emitting many more intermediate calculations.
>> - Still losing data.
>>
>> In my test case it output 342476 intermediate calculations for 3414
>> distinct windows, 14400 distinct were expected.
>>
>> Regards,
>>   Caleb
>>
>> On Wed, Jun 14, 2017 at 5:13 PM, Matthias J. Sax <ma...@confluent.io>
>> wrote:
>>
>>> This seems to be related to internal KTable caches. You can disable them
>>> by setting cache size to zero.
>>>
>>> http://docs.confluent.io/current/streams/developer-
>>> guide.html#memory-management
>>>
>>> -Matthias
>>>
>>>
>>>
>>> On 6/14/17 4:08 PM, Caleb Welton wrote:
>>>> Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then the
>>>> problem does not manifest, at `StreamsConfig.NUM_STREAM_
>>> THREADS_CONFIG=2`
>>>> or higher the problem shows up.
>>>>
>>>> When the number of threads is 1 the speed of data through the first
>> part
>>> of
>>>> the topology (before the ktable) slows down considerably, but it seems
>> to
>>>> slow down to the speed of the output which may be the key.
>>>>
>>>> That said... Changing the number of stream threads should not impact
>> data
>>>> correctness.  Seems like a bug someplace in kafka.
>>>>
>>>>
>>>>
>>>> On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai>
>>> wrote:
>>>>
>>>>> I have a topology of
>>>>>     KStream -> KTable -> KStream
>>>>>
>>>>> ```
>>>>>
>>>>> final KStreamBuilder builder = new KStreamBuilder();
>>>>> final KStream<String, Metric> metricStream =
>>> builder.stream(ingestTopic);
>>>>> final KTable<Windowed<String>, MyThing> myTable = metricStream
>>>>>         .groupByKey(stringSerde, mySerde)
>>>>>         .reduce(MyThing::merge,
>>>>>                 TimeWindows.of(10000).advanceBy(10000).until(
>>> Duration.ofDays(retentionDays).toMillis()),
>>>>>                 tableTopic);
>>>>>
>>>>> myTable.toStream()
>>>>>         .map((key, value) -> { return (KeyValue.pair(key.key(),
>>> value.finalize(key.window()))); })
>>>>>         .to(stringSerde, mySerde, sinkTopic);
>>>>>
>>>>> ```
>>>>>
>>>>>
>>>>> Normally went sent data at 10x a second I expect ~1 output metric for
>>>>> every 100 metrics it receives, based on the 10 second window width.
>>>>>
>>>>> When fed data real time at that rate it seems to do just that.
>>>>>
>>>>> However when I either reprocess on an input topic with a large amount
>> of
>>>>> data or feed data in significantly faster I see a very different
>>> behavior.
>>>>>
>>>>> Over the course of 20 seconds I can see 1,440,000 messages being
>>> ingested
>>>>> into the ktable, but only 633 emitted from it (expected 14400).
>>>>>
>>>>> Over the next minute the records output creeps to 1796, but then holds
>>>>> steady and does not keep going up to the expected total of 14400.
>>>>>
>>>>> A consumer reading from the sinkTopic ends up finding about 1264,
>> which
>>> is
>>>>> lower than the 1796 records I would have anticipated from the number
>> of
>>>>> calls into the final kstream map function.
>>>>>
>>>>> Precise number of emitted records will vary from one run to the next.
>>>>>
>>>>> Where are the extra metrics going?  Is there some commit issue that is
>>>>> causing dropped messages if the ktable producer isn't able to keep up?
>>>>>
>>>>> Any recommendations on where to focus the investigation of the issue?
>>>>>
>>>>> Running Kafka 0.10.2.1.
>>>>>
>>>>> Thanks,
>>>>>   Caleb
>>>>>
>>>>
>>>
>>>
>>
> 


Re: Dropped messages in kstreams?

Posted by Garrett Barton <ga...@gmail.com>.
Is your time usage correct?  It sounds like you want event time not
load/process time which is default unless you have a TimestampExtractor
defined somewhere upstream?  Otherwise I could see far fewer events coming
out as streams is just aggregating whatever showed up in that 10 second
window.

On Wed, Jun 14, 2017 at 8:43 PM, Caleb Welton <ca...@autonomic.ai> wrote:

> Disabling the cache with:
>
> ```
> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> 0)
> ```
>
> Results in:
> - Emitting many more intermediate calculations.
> - Still losing data.
>
> In my test case it output 342476 intermediate calculations for 3414
> distinct windows, 14400 distinct were expected.
>
> Regards,
>   Caleb
>
> On Wed, Jun 14, 2017 at 5:13 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > This seems to be related to internal KTable caches. You can disable them
> > by setting cache size to zero.
> >
> > http://docs.confluent.io/current/streams/developer-
> > guide.html#memory-management
> >
> > -Matthias
> >
> >
> >
> > On 6/14/17 4:08 PM, Caleb Welton wrote:
> > > Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then the
> > > problem does not manifest, at `StreamsConfig.NUM_STREAM_
> > THREADS_CONFIG=2`
> > > or higher the problem shows up.
> > >
> > > When the number of threads is 1 the speed of data through the first
> part
> > of
> > > the topology (before the ktable) slows down considerably, but it seems
> to
> > > slow down to the speed of the output which may be the key.
> > >
> > > That said... Changing the number of stream threads should not impact
> data
> > > correctness.  Seems like a bug someplace in kafka.
> > >
> > >
> > >
> > > On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai>
> > wrote:
> > >
> > >> I have a topology of
> > >>     KStream -> KTable -> KStream
> > >>
> > >> ```
> > >>
> > >> final KStreamBuilder builder = new KStreamBuilder();
> > >> final KStream<String, Metric> metricStream =
> > builder.stream(ingestTopic);
> > >> final KTable<Windowed<String>, MyThing> myTable = metricStream
> > >>         .groupByKey(stringSerde, mySerde)
> > >>         .reduce(MyThing::merge,
> > >>                 TimeWindows.of(10000).advanceBy(10000).until(
> > Duration.ofDays(retentionDays).toMillis()),
> > >>                 tableTopic);
> > >>
> > >> myTable.toStream()
> > >>         .map((key, value) -> { return (KeyValue.pair(key.key(),
> > value.finalize(key.window()))); })
> > >>         .to(stringSerde, mySerde, sinkTopic);
> > >>
> > >> ```
> > >>
> > >>
> > >> Normally went sent data at 10x a second I expect ~1 output metric for
> > >> every 100 metrics it receives, based on the 10 second window width.
> > >>
> > >> When fed data real time at that rate it seems to do just that.
> > >>
> > >> However when I either reprocess on an input topic with a large amount
> of
> > >> data or feed data in significantly faster I see a very different
> > behavior.
> > >>
> > >> Over the course of 20 seconds I can see 1,440,000 messages being
> > ingested
> > >> into the ktable, but only 633 emitted from it (expected 14400).
> > >>
> > >> Over the next minute the records output creeps to 1796, but then holds
> > >> steady and does not keep going up to the expected total of 14400.
> > >>
> > >> A consumer reading from the sinkTopic ends up finding about 1264,
> which
> > is
> > >> lower than the 1796 records I would have anticipated from the number
> of
> > >> calls into the final kstream map function.
> > >>
> > >> Precise number of emitted records will vary from one run to the next.
> > >>
> > >> Where are the extra metrics going?  Is there some commit issue that is
> > >> causing dropped messages if the ktable producer isn't able to keep up?
> > >>
> > >> Any recommendations on where to focus the investigation of the issue?
> > >>
> > >> Running Kafka 0.10.2.1.
> > >>
> > >> Thanks,
> > >>   Caleb
> > >>
> > >
> >
> >
>

Re: Dropped messages in kstreams?

Posted by Caleb Welton <ca...@autonomic.ai>.
Disabling the cache with:

```
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0)
```

Results in:
- Emitting many more intermediate calculations.
- Still losing data.

In my test case it output 342476 intermediate calculations for 3414
distinct windows, 14400 distinct were expected.

Regards,
  Caleb

On Wed, Jun 14, 2017 at 5:13 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> This seems to be related to internal KTable caches. You can disable them
> by setting cache size to zero.
>
> http://docs.confluent.io/current/streams/developer-
> guide.html#memory-management
>
> -Matthias
>
>
>
> On 6/14/17 4:08 PM, Caleb Welton wrote:
> > Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then the
> > problem does not manifest, at `StreamsConfig.NUM_STREAM_
> THREADS_CONFIG=2`
> > or higher the problem shows up.
> >
> > When the number of threads is 1 the speed of data through the first part
> of
> > the topology (before the ktable) slows down considerably, but it seems to
> > slow down to the speed of the output which may be the key.
> >
> > That said... Changing the number of stream threads should not impact data
> > correctness.  Seems like a bug someplace in kafka.
> >
> >
> >
> > On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai>
> wrote:
> >
> >> I have a topology of
> >>     KStream -> KTable -> KStream
> >>
> >> ```
> >>
> >> final KStreamBuilder builder = new KStreamBuilder();
> >> final KStream<String, Metric> metricStream =
> builder.stream(ingestTopic);
> >> final KTable<Windowed<String>, MyThing> myTable = metricStream
> >>         .groupByKey(stringSerde, mySerde)
> >>         .reduce(MyThing::merge,
> >>                 TimeWindows.of(10000).advanceBy(10000).until(
> Duration.ofDays(retentionDays).toMillis()),
> >>                 tableTopic);
> >>
> >> myTable.toStream()
> >>         .map((key, value) -> { return (KeyValue.pair(key.key(),
> value.finalize(key.window()))); })
> >>         .to(stringSerde, mySerde, sinkTopic);
> >>
> >> ```
> >>
> >>
> >> Normally went sent data at 10x a second I expect ~1 output metric for
> >> every 100 metrics it receives, based on the 10 second window width.
> >>
> >> When fed data real time at that rate it seems to do just that.
> >>
> >> However when I either reprocess on an input topic with a large amount of
> >> data or feed data in significantly faster I see a very different
> behavior.
> >>
> >> Over the course of 20 seconds I can see 1,440,000 messages being
> ingested
> >> into the ktable, but only 633 emitted from it (expected 14400).
> >>
> >> Over the next minute the records output creeps to 1796, but then holds
> >> steady and does not keep going up to the expected total of 14400.
> >>
> >> A consumer reading from the sinkTopic ends up finding about 1264, which
> is
> >> lower than the 1796 records I would have anticipated from the number of
> >> calls into the final kstream map function.
> >>
> >> Precise number of emitted records will vary from one run to the next.
> >>
> >> Where are the extra metrics going?  Is there some commit issue that is
> >> causing dropped messages if the ktable producer isn't able to keep up?
> >>
> >> Any recommendations on where to focus the investigation of the issue?
> >>
> >> Running Kafka 0.10.2.1.
> >>
> >> Thanks,
> >>   Caleb
> >>
> >
>
>

Re: Dropped messages in kstreams?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
This seems to be related to internal KTable caches. You can disable them
by setting cache size to zero.

http://docs.confluent.io/current/streams/developer-guide.html#memory-management

-Matthias



On 6/14/17 4:08 PM, Caleb Welton wrote:
> Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then the
> problem does not manifest, at `StreamsConfig.NUM_STREAM_THREADS_CONFIG=2`
> or higher the problem shows up.
> 
> When the number of threads is 1 the speed of data through the first part of
> the topology (before the ktable) slows down considerably, but it seems to
> slow down to the speed of the output which may be the key.
> 
> That said... Changing the number of stream threads should not impact data
> correctness.  Seems like a bug someplace in kafka.
> 
> 
> 
> On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai> wrote:
> 
>> I have a topology of
>>     KStream -> KTable -> KStream
>>
>> ```
>>
>> final KStreamBuilder builder = new KStreamBuilder();
>> final KStream<String, Metric> metricStream = builder.stream(ingestTopic);
>> final KTable<Windowed<String>, MyThing> myTable = metricStream
>>         .groupByKey(stringSerde, mySerde)
>>         .reduce(MyThing::merge,
>>                 TimeWindows.of(10000).advanceBy(10000).until(Duration.ofDays(retentionDays).toMillis()),
>>                 tableTopic);
>>
>> myTable.toStream()
>>         .map((key, value) -> { return (KeyValue.pair(key.key(), value.finalize(key.window()))); })
>>         .to(stringSerde, mySerde, sinkTopic);
>>
>> ```
>>
>>
>> Normally went sent data at 10x a second I expect ~1 output metric for
>> every 100 metrics it receives, based on the 10 second window width.
>>
>> When fed data real time at that rate it seems to do just that.
>>
>> However when I either reprocess on an input topic with a large amount of
>> data or feed data in significantly faster I see a very different behavior.
>>
>> Over the course of 20 seconds I can see 1,440,000 messages being ingested
>> into the ktable, but only 633 emitted from it (expected 14400).
>>
>> Over the next minute the records output creeps to 1796, but then holds
>> steady and does not keep going up to the expected total of 14400.
>>
>> A consumer reading from the sinkTopic ends up finding about 1264, which is
>> lower than the 1796 records I would have anticipated from the number of
>> calls into the final kstream map function.
>>
>> Precise number of emitted records will vary from one run to the next.
>>
>> Where are the extra metrics going?  Is there some commit issue that is
>> causing dropped messages if the ktable producer isn't able to keep up?
>>
>> Any recommendations on where to focus the investigation of the issue?
>>
>> Running Kafka 0.10.2.1.
>>
>> Thanks,
>>   Caleb
>>
> 


Re: Dropped messages in kstreams?

Posted by Caleb Welton <ca...@autonomic.ai>.
Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then the
problem does not manifest, at `StreamsConfig.NUM_STREAM_THREADS_CONFIG=2`
or higher the problem shows up.

When the number of threads is 1 the speed of data through the first part of
the topology (before the ktable) slows down considerably, but it seems to
slow down to the speed of the output which may be the key.

That said... Changing the number of stream threads should not impact data
correctness.  Seems like a bug someplace in kafka.



On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai> wrote:

> I have a topology of
>     KStream -> KTable -> KStream
>
> ```
>
> final KStreamBuilder builder = new KStreamBuilder();
> final KStream<String, Metric> metricStream = builder.stream(ingestTopic);
> final KTable<Windowed<String>, MyThing> myTable = metricStream
>         .groupByKey(stringSerde, mySerde)
>         .reduce(MyThing::merge,
>                 TimeWindows.of(10000).advanceBy(10000).until(Duration.ofDays(retentionDays).toMillis()),
>                 tableTopic);
>
> myTable.toStream()
>         .map((key, value) -> { return (KeyValue.pair(key.key(), value.finalize(key.window()))); })
>         .to(stringSerde, mySerde, sinkTopic);
>
> ```
>
>
> Normally went sent data at 10x a second I expect ~1 output metric for
> every 100 metrics it receives, based on the 10 second window width.
>
> When fed data real time at that rate it seems to do just that.
>
> However when I either reprocess on an input topic with a large amount of
> data or feed data in significantly faster I see a very different behavior.
>
> Over the course of 20 seconds I can see 1,440,000 messages being ingested
> into the ktable, but only 633 emitted from it (expected 14400).
>
> Over the next minute the records output creeps to 1796, but then holds
> steady and does not keep going up to the expected total of 14400.
>
> A consumer reading from the sinkTopic ends up finding about 1264, which is
> lower than the 1796 records I would have anticipated from the number of
> calls into the final kstream map function.
>
> Precise number of emitted records will vary from one run to the next.
>
> Where are the extra metrics going?  Is there some commit issue that is
> causing dropped messages if the ktable producer isn't able to keep up?
>
> Any recommendations on where to focus the investigation of the issue?
>
> Running Kafka 0.10.2.1.
>
> Thanks,
>   Caleb
>