You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Mahendra Kariya <ma...@go-jek.com> on 2017/05/02 10:27:43 UTC

Re: Debugging Kafka Streams Windowing

Hi Matthias,

What we did was read the data from sink topic and print it to console. And
here's the raw data from that topic (the counts are randomized). As we can
see, the data is certainly missing for some time windows. For instance,
after 1493693760, the next timestamp for which the data is present
is 1493694300. That's around 9 minutes of data missing.

And this is just one instance. There are a lot of such instances in this
file.



On Sun, Apr 30, 2017 at 11:23 AM, Mahendra Kariya <
mahendra.kariya@go-jek.com> wrote:

> Thanks for the update Matthias! And sorry for the delayed response.
>
> The reason we use .aggregate() is because we want to count the number of
> unique values for a particular field in the message. So, we just add that
> particular field's value in the HashSet and then take the size of the
> HashSet.
>
> On our side, we are also investigating and it looks like there might be a
> bug somewhere in our codebase. If that's the case, then it's quite possible
> that there is no bug in Kafka Streams, except the metric one.
>
> We will revert after confirming.
>
>
>
>
> On Sun, Apr 30, 2017 at 10:39 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
>> Just a follow up (we identified a bug in the "skipped records" metric).
>> The reported value is not correct.
>>
>>
>> On 4/28/17 9:12 PM, Matthias J. Sax wrote:
>> > Ok. That makes sense.
>> >
>> > Question: why do you use .aggregate() instead of .count() ?
>> >
>> > Also, can you share the code of you AggregatorFunction()? Did you change
>> > any default setting of StreamsConfig?
>> >
>> > I have still no idea what could go wrong. Maybe you can run with log
>> > level TRACE? Maybe we can get some insight from those.
>> >
>> >
>> > -Matthias
>> >
>> > On 4/27/17 11:41 PM, Mahendra Kariya wrote:
>> >> Oh good point!
>> >>
>> >> The reason why there is only one row corresponding to each time window
>> is
>> >> because it only contains the latest value for the time window. So what
>> we
>> >> did was we just dumped the data present in the sink topic to a db
>> using an
>> >> upsert query. The primary key of the table was time window. The file
>> that I
>> >> attached is actually the data present in the DB. And we know that
>> there is
>> >> no bug in our db dump code because we have been using it for a long
>> time in
>> >> production without any issues.
>> >>
>> >> The reason the count is zero for some time windows is because I
>> subtracted
>> >> a random number the actual values and rounded it off to zero; for
>> privacy
>> >> reason. The actual data doesn't have any zero values. I should have
>> >> mentioned this earlier. My bad!
>> >>
>> >> The stream topology code looks something like this.
>> >>
>> >> stream
>> >>     .filter()
>> >>     .map((key, value) -> new KeyValue<>(transform(key), value)
>> >>     .groupByKey()
>> >>     .aggregate(HashSet::new, AggregatorFunction(),
>> >> TimeWindows.of(60000).until(3600000))
>> >>     .mapValues(HashSet::size)
>> >>     .toStream()
>> >>     .map((key, value) -> convertToProtobufObject(key, value))
>> >>     .to()
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> On Fri, Apr 28, 2017 at 1:13 PM, Matthias J. Sax <
>> matthias@confluent.io>
>> >> wrote:
>> >>
>> >>> Thanks for the details (sorry that I forgot that you did share the
>> >>> output already).
>> >>>
>> >>> Might be a dumb question, but what is the count for missing windows in
>> >>> your seconds implementation?
>> >>>
>> >>> If there is no data for a window, it should not emit a window with
>> count
>> >>> zero, but nothing.
>> >>>
>> >>> Thus, looking at your output, I am wondering how it could contain line
>> >>> like:
>> >>>
>> >>>> 2017-04-27T04:53:00 0
>> >>>
>> >>> I am also wondering why your output only contains a single value per
>> >>> window. As Streams outputs multiple updates per window while the count
>> >>> is increasing, you should actually see multiple records per window.
>> >>>
>> >>> Your code is like this:
>> >>>
>> >>> stream.filter().groupByKey().count(TimeWindow.of(60000)).to();
>> >>>
>> >>> Or do you have something more complex?
>> >>>
>> >>>
>> >>> -Matthias
>> >>>
>> >>>
>> >>> On 4/27/17 9:16 PM, Mahendra Kariya wrote:
>> >>>>> Can you somehow verify your output?
>> >>>>
>> >>>>
>> >>>> Do you mean the Kafka streams output? In the Kafka Streams output,
>> we do
>> >>>> see some missing values. I have attached the Kafka Streams output
>> (for a
>> >>>> few hours) in the very first email of this thread for reference.
>> >>>>
>> >>>> Let me also summarise what we have done so far.
>> >>>>
>> >>>> We took a dump of the raw data present in the source topic. We wrote
>> a
>> >>>> script to read this data and do the exact same aggregations that we
>> do
>> >>>> using Kafka Streams. And then we compared the output from Kafka
>> Streams
>> >>> and
>> >>>> our script.
>> >>>>
>> >>>> The difference that we observed in the two outputs is that there
>> were a
>> >>> few
>> >>>> rows (corresponding to some time windows) missing in the Streams
>> output.
>> >>>> For the time windows for which the data was present, the aggregated
>> >>> numbers
>> >>>> matched exactly.
>> >>>>
>> >>>> This means, either all the records for a particular time window are
>> being
>> >>>> skipped, or none. Now this is highly unlikely to happen. Maybe there
>> is a
>> >>>> bug somewhere in the rocksdb state stores? Just a speculation, not
>> sure
>> >>>> though. And there could even be a bug in the reported metric.
>> >>>>
>> >>>
>> >>>
>> >>
>> >
>>
>>
>

Re: Debugging Kafka Streams Windowing

Posted by Mahendra Kariya <ma...@go-jek.com>.
Yes. To some extent. But the rebalancing is now taking a lot of time. There
are situations where we have to manually restart the Streams app because
rebalancing is kind of "stuck" for several minutes.

On 7 June 2017 at 06:28, Garrett Barton <ga...@gmail.com> wrote:

> Mahendra,
>
>  Did increasing those two properties do the trick?  I am running into this
> exact issue testing streams out on a single Kafka instance.  Yet I can
> manually start a consumer and read the topics fine while its busy doing
> this dead stuffs.
>
> On Tue, May 23, 2017 at 12:30 AM, Mahendra Kariya <
> mahendra.kariya@go-jek.com> wrote:
>
> > On 22 May 2017 at 16:09, Guozhang Wang <wa...@gmail.com> wrote:
> >
> > > For
> > > that issue I'd suspect that there is a network issue, or maybe the
> > network
> > > is just saturated already and the heartbeat request / response were not
> > > exchanged in time between the consumer and the broker, or the sockets
> > being
> > > dropped because of socket limit. Under this cases not all consumers may
> > be
> > > affected, but since the associated issue is from "AbstractCoordinator"
> > > class which is part of the consumer client, I'd still be surprised if
> it
> > is
> > > actually due to Streams itself with the same consumer config settings,
> > but
> > > not to consumers.
> > >
> >
> > Yes. This is the conclusion that even we are coming to after further
> > investigation. But didn't want to post it here until we were sure.
> >
> > We are experimenting with increasing the default timeouts, particularly
> > hearbeat.interval.ms and session.timeout.ms. So far, the things have
> been
> > running fine. But we will let it run for a few more days before closing
> > this issue.
> >
>

Re: Debugging Kafka Streams Windowing

Posted by Garrett Barton <ga...@gmail.com>.
Mahendra,

 Did increasing those two properties do the trick?  I am running into this
exact issue testing streams out on a single Kafka instance.  Yet I can
manually start a consumer and read the topics fine while its busy doing
this dead stuffs.

On Tue, May 23, 2017 at 12:30 AM, Mahendra Kariya <
mahendra.kariya@go-jek.com> wrote:

> On 22 May 2017 at 16:09, Guozhang Wang <wa...@gmail.com> wrote:
>
> > For
> > that issue I'd suspect that there is a network issue, or maybe the
> network
> > is just saturated already and the heartbeat request / response were not
> > exchanged in time between the consumer and the broker, or the sockets
> being
> > dropped because of socket limit. Under this cases not all consumers may
> be
> > affected, but since the associated issue is from "AbstractCoordinator"
> > class which is part of the consumer client, I'd still be surprised if it
> is
> > actually due to Streams itself with the same consumer config settings,
> but
> > not to consumers.
> >
>
> Yes. This is the conclusion that even we are coming to after further
> investigation. But didn't want to post it here until we were sure.
>
> We are experimenting with increasing the default timeouts, particularly
> hearbeat.interval.ms and session.timeout.ms. So far, the things have been
> running fine. But we will let it run for a few more days before closing
> this issue.
>

Re: Debugging Kafka Streams Windowing

Posted by Mahendra Kariya <ma...@go-jek.com>.
On 22 May 2017 at 16:09, Guozhang Wang <wa...@gmail.com> wrote:

> For
> that issue I'd suspect that there is a network issue, or maybe the network
> is just saturated already and the heartbeat request / response were not
> exchanged in time between the consumer and the broker, or the sockets being
> dropped because of socket limit. Under this cases not all consumers may be
> affected, but since the associated issue is from "AbstractCoordinator"
> class which is part of the consumer client, I'd still be surprised if it is
> actually due to Streams itself with the same consumer config settings, but
> not to consumers.
>

Yes. This is the conclusion that even we are coming to after further
investigation. But didn't want to post it here until we were sure.

We are experimenting with increasing the default timeouts, particularly
hearbeat.interval.ms and session.timeout.ms. So far, the things have been
running fine. But we will let it run for a few more days before closing
this issue.

Re: Debugging Kafka Streams Windowing

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Manhendra,

Sorry for the late reply.

Just to clarify my previous reply was only for your question about:

"
There is also another issue where a particular broker is marked as dead for
a group id and Streams process never recovers from this exception.
"

And I thought your attached logs are associating with the above described
"exception":

"
16:13:16.527 [StreamThread-7] INFO o.a.k.c.c.i.AbstractCoordinator -
Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for
group grp_id
16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group
grp_id.
16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.AbstractCoordinator -
Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for
group grp_id
16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group
grp_id.
16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.AbstractCoordinator -
Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for
group grp_id
16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group
grp_id.
16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator -
Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for
group grp_id
16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group
grp_id.
16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.AbstractCoordinator -
Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for
group grp_id
16:13:16.573 [StreamThread-2] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group
grp_id.

"

I thought you meant that the broker (05:6667) indeed become un-available
transiently, and that streams hence cannot recover from the above state.
But from your last email, it seems that the broker was not actually
crashed, but that some streams' embedded consumer coordinator continuously
mark it as dead and keep trying to re-discover the broker again, right? For
that issue I'd suspect that there is a network issue, or maybe the network
is just saturated already and the heartbeat request / response were not
exchanged in time between the consumer and the broker, or the sockets being
dropped because of socket limit. Under this cases not all consumers may be
affected, but since the associated issue is from "AbstractCoordinator"
class which is part of the consumer client, I'd still be surprised if it is
actually due to Streams itself with the same consumer config settings, but
not to consumers.

Guozhang

On Tue, May 16, 2017 at 8:58 PM, Mahendra Kariya <mahendra.kariya@go-jek.com
> wrote:

> I am confused. If what you have mentioned is the case, then
>
>    - Why would restarting the stream processes resolve the issue?
>    - Why do we get these infinite stream of exceptions only on some boxes
>    in the cluster and not all?
>    - We have tens of other consumers running just fine. We see this issue
>    only in the streams one.
>
>
>
>
> On Tue, May 16, 2017 at 3:36 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Sorry I mis-read your email and confused it with another thread.
> >
> > As for your observed issue, it seems "broker-05:6667" is in an unstable
> > state which is the group coordinator for this stream process app with app
> > id (i.e. group id) "grp_id". Since the streams app cannot commit offsets
> > anymore due to group coordinator not available, it cannot proceed but
> > repeatedly re-discovers the coordinator.
> >
> > This is not generally an issue for streams, but for consumer group
> > membership management. In practice you need to make sure that the offset
> > topic is replicate (I think by default it is 3 replicas) so that whenever
> > the leader of a certain offset topic partition, hence the group
> > coordinator, fails, another broker can take over so that any consumer
> > groups that is corresponding to that offset topic partition won't be
> > blocked.
> >
> >
> > Guozhang
> >
> >
> >
> > On Mon, May 15, 2017 at 7:33 PM, Mahendra Kariya <
> > mahendra.kariya@go-jek.com
> > > wrote:
> >
> > > Thanks for the reply Guozhang! But I think we are talking of 2
> different
> > > issues here. KAFKA-5167 is for LockException. We face this issue
> > > intermittently, but not a lot.
> > >
> > > There is also another issue where a particular broker is marked as dead
> > for
> > > a group id and Streams process never recovers from this exception.
> > >
> > > On Mon, May 15, 2017 at 11:28 PM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > >
> > > > I'm wondering if it is possibly due to KAFKA-5167? In that case, the
> > > "other
> > > > thread" will keep retrying on grabbing the lock.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Sat, May 13, 2017 at 7:30 PM, Mahendra Kariya <
> > > > mahendra.kariya@go-jek.com
> > > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > There is no missing data. But the INFO level logs are infinite and
> > the
> > > > > streams practically stops. For the messages that I posted, we got
> > these
> > > > > INFO logs for around 20 mins. After which we got an alert about no
> > data
> > > > > being produced in the sink topic and we had to restart the streams
> > > > > processes.
> > > > >
> > > > >
> > > > >
> > > > > On Sun, May 14, 2017 at 1:01 AM, Matthias J. Sax <
> > > matthias@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I just dug a little bit. The messages are logged at INFO level
> and
> > > thus
> > > > > > should not be a problem if they go away by themselves after some
> > > time.
> > > > > > Compare:
> > > > > > https://groups.google.com/forum/#!topic/confluent-
> > > platform/A14dkPlDlv4
> > > > > >
> > > > > > Do you still see missing data?
> > > > > >
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > >
> > > > > > On 5/11/17 2:39 AM, Mahendra Kariya wrote:
> > > > > > > Hi Matthias,
> > > > > > >
> > > > > > > We faced the issue again. The logs are below.
> > > > > > >
> > > > > > > 16:13:16.527 [StreamThread-7] INFO o.a.k.c.c.i.
> > AbstractCoordinator
> > > -
> > > > > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > > > > <(214)%20748-3642> rack: null) dead
> > > > > > for
> > > > > > > group grp_id
> > > > > > > 16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.
> > AbstractCoordinator
> > > -
> > > > > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > > > > <(214)%20748-3642> rack: null) for
> > > > > > group
> > > > > > > grp_id.
> > > > > > > 16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.
> > AbstractCoordinator
> > > -
> > > > > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > > > > <(214)%20748-3642> rack: null) dead
> > > > > > for
> > > > > > > group grp_id
> > > > > > > 16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.
> > AbstractCoordinator
> > > -
> > > > > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > > > > <(214)%20748-3642> rack: null) for
> > > > > > group
> > > > > > > grp_id.
> > > > > > > 16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.
> > AbstractCoordinator
> > > -
> > > > > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > > > > <(214)%20748-3642> rack: null) dead
> > > > > > for
> > > > > > > group grp_id
> > > > > > > 16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.
> > AbstractCoordinator
> > > -
> > > > > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > > > > <(214)%20748-3642> rack: null) for
> > > > > > group
> > > > > > > grp_id.
> > > > > > > 16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.
> > AbstractCoordinator
> > > -
> > > > > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > > > > <(214)%20748-3642> rack: null) dead
> > > > > > for
> > > > > > > group grp_id
> > > > > > > 16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.
> > AbstractCoordinator
> > > -
> > > > > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > > > > <(214)%20748-3642> rack: null) for
> > > > > > group
> > > > > > > grp_id.
> > > > > > > 16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.
> > AbstractCoordinator
> > > -
> > > > > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > > > > <(214)%20748-3642> rack: null) dead
> > > > > > for
> > > > > > > group grp_id
> > > > > > > 16:13:16.573 [StreamThread-2] INFO o.a.k.c.c.i.
> > AbstractCoordinator
> > > -
> > > > > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > > > > <(214)%20748-3642> rack: null) for
> > > > > > group
> > > > > > > grp_id.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Tue, May 9, 2017 at 3:40 AM, Matthias J. Sax <
> > > > matthias@confluent.io
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Great! Glad 0.10.2.1 fixes it for you!
> > > > > > >>
> > > > > > >> -Matthias
> > > > > > >>
> > > > > > >> On 5/7/17 8:57 PM, Mahendra Kariya wrote:
> > > > > > >>> Upgrading to 0.10.2.1 seems to have fixed the issue.
> > > > > > >>>
> > > > > > >>> Until now, we were looking at random 1 hour data to analyse
> the
> > > > > issue.
> > > > > > >> Over
> > > > > > >>> the weekend, we have written a simple test that will
> > continuously
> > > > > check
> > > > > > >> for
> > > > > > >>> inconsistencies in real time and report if there is any
> issue.
> > > > > > >>>
> > > > > > >>> No issues have been reported for the last 24 hours. Will
> update
> > > > this
> > > > > > >> thread
> > > > > > >>> if we find any issue.
> > > > > > >>>
> > > > > > >>> Thanks for all the support!
> > > > > > >>>
> > > > > > >>>
> > > > > > >>>
> > > > > > >>> On Fri, May 5, 2017 at 3:55 AM, Matthias J. Sax <
> > > > > matthias@confluent.io
> > > > > > >
> > > > > > >>> wrote:
> > > > > > >>>
> > > > > > >>>> About
> > > > > > >>>>
> > > > > > >>>>> 07:44:08.493 [StreamThread-10] INFO
> > > > o.a.k.c.c.i.AbstractCoordinato
> > > > > r
> > > > > > -
> > > > > > >>>>> Discovered coordinator broker-05:6667 for group group-2.
> > > > > > >>>>
> > > > > > >>>> Please upgrade to Streams 0.10.2.1 -- we fixed couple of bug
> > > and I
> > > > > > would
> > > > > > >>>> assume this issue is fixed, too. If not, please report back.
> > > > > > >>>>
> > > > > > >>>>> Another question that I have is, is there a way for us
> detect
> > > how
> > > > > > many
> > > > > > >>>>> messages have come out of order? And if possible, what is
> the
> > > > > delay?
> > > > > > >>>>
> > > > > > >>>> There is no metric or api for this. What you could do though
> > is,
> > > > to
> > > > > > use
> > > > > > >>>> #transform() that only forwards each record and as a side
> > task,
> > > > > > extracts
> > > > > > >>>> the timestamp via `context#timestamp()` and does some book
> > > keeping
> > > > > to
> > > > > > >>>> compute if out-of-order and what the delay was.
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>>>>>  - same for .mapValues()
> > > > > > >>>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> I am not sure how to check this.
> > > > > > >>>>
> > > > > > >>>> The same way as you do for filter()?
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>> -Matthias
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>> On 5/4/17 10:29 AM, Mahendra Kariya wrote:
> > > > > > >>>>> Hi Matthias,
> > > > > > >>>>>
> > > > > > >>>>> Please find the answers below.
> > > > > > >>>>>
> > > > > > >>>>> I would recommend to double check the following:
> > > > > > >>>>>>
> > > > > > >>>>>>  - can you confirm that the filter does not remove all
> data
> > > for
> > > > > > those
> > > > > > >>>>>> time periods?
> > > > > > >>>>>>
> > > > > > >>>>>
> > > > > > >>>>> Filter does not remove all data. There is a lot of data
> > coming
> > > in
> > > > > > even
> > > > > > >>>>> after the filter stage.
> > > > > > >>>>>
> > > > > > >>>>>
> > > > > > >>>>>>  - I would also check input for your AggregatorFunction()
> --
> > > > does
> > > > > it
> > > > > > >>>>>> receive everything?
> > > > > > >>>>>>
> > > > > > >>>>>
> > > > > > >>>>> Yes. Aggregate function seems to be receiving everything.
> > > > > > >>>>>
> > > > > > >>>>>
> > > > > > >>>>>>  - same for .mapValues()
> > > > > > >>>>>>
> > > > > > >>>>>
> > > > > > >>>>> I am not sure how to check this.
> > > > > > >>>>>
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>
> > > > > > >>
> > > > > > >>
> > > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: Debugging Kafka Streams Windowing

Posted by Mahendra Kariya <ma...@go-jek.com>.
I am confused. If what you have mentioned is the case, then

   - Why would restarting the stream processes resolve the issue?
   - Why do we get these infinite stream of exceptions only on some boxes
   in the cluster and not all?
   - We have tens of other consumers running just fine. We see this issue
   only in the streams one.




On Tue, May 16, 2017 at 3:36 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Sorry I mis-read your email and confused it with another thread.
>
> As for your observed issue, it seems "broker-05:6667" is in an unstable
> state which is the group coordinator for this stream process app with app
> id (i.e. group id) "grp_id". Since the streams app cannot commit offsets
> anymore due to group coordinator not available, it cannot proceed but
> repeatedly re-discovers the coordinator.
>
> This is not generally an issue for streams, but for consumer group
> membership management. In practice you need to make sure that the offset
> topic is replicate (I think by default it is 3 replicas) so that whenever
> the leader of a certain offset topic partition, hence the group
> coordinator, fails, another broker can take over so that any consumer
> groups that is corresponding to that offset topic partition won't be
> blocked.
>
>
> Guozhang
>
>
>
> On Mon, May 15, 2017 at 7:33 PM, Mahendra Kariya <
> mahendra.kariya@go-jek.com
> > wrote:
>
> > Thanks for the reply Guozhang! But I think we are talking of 2 different
> > issues here. KAFKA-5167 is for LockException. We face this issue
> > intermittently, but not a lot.
> >
> > There is also another issue where a particular broker is marked as dead
> for
> > a group id and Streams process never recovers from this exception.
> >
> > On Mon, May 15, 2017 at 11:28 PM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> >
> > > I'm wondering if it is possibly due to KAFKA-5167? In that case, the
> > "other
> > > thread" will keep retrying on grabbing the lock.
> > >
> > > Guozhang
> > >
> > >
> > > On Sat, May 13, 2017 at 7:30 PM, Mahendra Kariya <
> > > mahendra.kariya@go-jek.com
> > > > wrote:
> > >
> > > > Hi,
> > > >
> > > > There is no missing data. But the INFO level logs are infinite and
> the
> > > > streams practically stops. For the messages that I posted, we got
> these
> > > > INFO logs for around 20 mins. After which we got an alert about no
> data
> > > > being produced in the sink topic and we had to restart the streams
> > > > processes.
> > > >
> > > >
> > > >
> > > > On Sun, May 14, 2017 at 1:01 AM, Matthias J. Sax <
> > matthias@confluent.io>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I just dug a little bit. The messages are logged at INFO level and
> > thus
> > > > > should not be a problem if they go away by themselves after some
> > time.
> > > > > Compare:
> > > > > https://groups.google.com/forum/#!topic/confluent-
> > platform/A14dkPlDlv4
> > > > >
> > > > > Do you still see missing data?
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > >
> > > > > On 5/11/17 2:39 AM, Mahendra Kariya wrote:
> > > > > > Hi Matthias,
> > > > > >
> > > > > > We faced the issue again. The logs are below.
> > > > > >
> > > > > > 16:13:16.527 [StreamThread-7] INFO o.a.k.c.c.i.
> AbstractCoordinator
> > -
> > > > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > > > <(214)%20748-3642> rack: null) dead
> > > > > for
> > > > > > group grp_id
> > > > > > 16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.
> AbstractCoordinator
> > -
> > > > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > > > <(214)%20748-3642> rack: null) for
> > > > > group
> > > > > > grp_id.
> > > > > > 16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.
> AbstractCoordinator
> > -
> > > > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > > > <(214)%20748-3642> rack: null) dead
> > > > > for
> > > > > > group grp_id
> > > > > > 16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.
> AbstractCoordinator
> > -
> > > > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > > > <(214)%20748-3642> rack: null) for
> > > > > group
> > > > > > grp_id.
> > > > > > 16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.
> AbstractCoordinator
> > -
> > > > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > > > <(214)%20748-3642> rack: null) dead
> > > > > for
> > > > > > group grp_id
> > > > > > 16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.
> AbstractCoordinator
> > -
> > > > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > > > <(214)%20748-3642> rack: null) for
> > > > > group
> > > > > > grp_id.
> > > > > > 16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.
> AbstractCoordinator
> > -
> > > > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > > > <(214)%20748-3642> rack: null) dead
> > > > > for
> > > > > > group grp_id
> > > > > > 16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.
> AbstractCoordinator
> > -
> > > > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > > > <(214)%20748-3642> rack: null) for
> > > > > group
> > > > > > grp_id.
> > > > > > 16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.
> AbstractCoordinator
> > -
> > > > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > > > <(214)%20748-3642> rack: null) dead
> > > > > for
> > > > > > group grp_id
> > > > > > 16:13:16.573 [StreamThread-2] INFO o.a.k.c.c.i.
> AbstractCoordinator
> > -
> > > > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > > > <(214)%20748-3642> rack: null) for
> > > > > group
> > > > > > grp_id.
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, May 9, 2017 at 3:40 AM, Matthias J. Sax <
> > > matthias@confluent.io
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > >> Great! Glad 0.10.2.1 fixes it for you!
> > > > > >>
> > > > > >> -Matthias
> > > > > >>
> > > > > >> On 5/7/17 8:57 PM, Mahendra Kariya wrote:
> > > > > >>> Upgrading to 0.10.2.1 seems to have fixed the issue.
> > > > > >>>
> > > > > >>> Until now, we were looking at random 1 hour data to analyse the
> > > > issue.
> > > > > >> Over
> > > > > >>> the weekend, we have written a simple test that will
> continuously
> > > > check
> > > > > >> for
> > > > > >>> inconsistencies in real time and report if there is any issue.
> > > > > >>>
> > > > > >>> No issues have been reported for the last 24 hours. Will update
> > > this
> > > > > >> thread
> > > > > >>> if we find any issue.
> > > > > >>>
> > > > > >>> Thanks for all the support!
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>> On Fri, May 5, 2017 at 3:55 AM, Matthias J. Sax <
> > > > matthias@confluent.io
> > > > > >
> > > > > >>> wrote:
> > > > > >>>
> > > > > >>>> About
> > > > > >>>>
> > > > > >>>>> 07:44:08.493 [StreamThread-10] INFO
> > > o.a.k.c.c.i.AbstractCoordinato
> > > > r
> > > > > -
> > > > > >>>>> Discovered coordinator broker-05:6667 for group group-2.
> > > > > >>>>
> > > > > >>>> Please upgrade to Streams 0.10.2.1 -- we fixed couple of bug
> > and I
> > > > > would
> > > > > >>>> assume this issue is fixed, too. If not, please report back.
> > > > > >>>>
> > > > > >>>>> Another question that I have is, is there a way for us detect
> > how
> > > > > many
> > > > > >>>>> messages have come out of order? And if possible, what is the
> > > > delay?
> > > > > >>>>
> > > > > >>>> There is no metric or api for this. What you could do though
> is,
> > > to
> > > > > use
> > > > > >>>> #transform() that only forwards each record and as a side
> task,
> > > > > extracts
> > > > > >>>> the timestamp via `context#timestamp()` and does some book
> > keeping
> > > > to
> > > > > >>>> compute if out-of-order and what the delay was.
> > > > > >>>>
> > > > > >>>>
> > > > > >>>>>>>  - same for .mapValues()
> > > > > >>>>>>>
> > > > > >>>>>>
> > > > > >>>>>> I am not sure how to check this.
> > > > > >>>>
> > > > > >>>> The same way as you do for filter()?
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> -Matthias
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> On 5/4/17 10:29 AM, Mahendra Kariya wrote:
> > > > > >>>>> Hi Matthias,
> > > > > >>>>>
> > > > > >>>>> Please find the answers below.
> > > > > >>>>>
> > > > > >>>>> I would recommend to double check the following:
> > > > > >>>>>>
> > > > > >>>>>>  - can you confirm that the filter does not remove all data
> > for
> > > > > those
> > > > > >>>>>> time periods?
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>>> Filter does not remove all data. There is a lot of data
> coming
> > in
> > > > > even
> > > > > >>>>> after the filter stage.
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>>  - I would also check input for your AggregatorFunction() --
> > > does
> > > > it
> > > > > >>>>>> receive everything?
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>>> Yes. Aggregate function seems to be receiving everything.
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>>  - same for .mapValues()
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>>> I am not sure how to check this.
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > > >>
> > > > > >
> > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Debugging Kafka Streams Windowing

Posted by Guozhang Wang <wa...@gmail.com>.
Sorry I mis-read your email and confused it with another thread.

As for your observed issue, it seems "broker-05:6667" is in an unstable
state which is the group coordinator for this stream process app with app
id (i.e. group id) "grp_id". Since the streams app cannot commit offsets
anymore due to group coordinator not available, it cannot proceed but
repeatedly re-discovers the coordinator.

This is not generally an issue for streams, but for consumer group
membership management. In practice you need to make sure that the offset
topic is replicate (I think by default it is 3 replicas) so that whenever
the leader of a certain offset topic partition, hence the group
coordinator, fails, another broker can take over so that any consumer
groups that is corresponding to that offset topic partition won't be
blocked.


Guozhang



On Mon, May 15, 2017 at 7:33 PM, Mahendra Kariya <mahendra.kariya@go-jek.com
> wrote:

> Thanks for the reply Guozhang! But I think we are talking of 2 different
> issues here. KAFKA-5167 is for LockException. We face this issue
> intermittently, but not a lot.
>
> There is also another issue where a particular broker is marked as dead for
> a group id and Streams process never recovers from this exception.
>
> On Mon, May 15, 2017 at 11:28 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
>
> > I'm wondering if it is possibly due to KAFKA-5167? In that case, the
> "other
> > thread" will keep retrying on grabbing the lock.
> >
> > Guozhang
> >
> >
> > On Sat, May 13, 2017 at 7:30 PM, Mahendra Kariya <
> > mahendra.kariya@go-jek.com
> > > wrote:
> >
> > > Hi,
> > >
> > > There is no missing data. But the INFO level logs are infinite and the
> > > streams practically stops. For the messages that I posted, we got these
> > > INFO logs for around 20 mins. After which we got an alert about no data
> > > being produced in the sink topic and we had to restart the streams
> > > processes.
> > >
> > >
> > >
> > > On Sun, May 14, 2017 at 1:01 AM, Matthias J. Sax <
> matthias@confluent.io>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I just dug a little bit. The messages are logged at INFO level and
> thus
> > > > should not be a problem if they go away by themselves after some
> time.
> > > > Compare:
> > > > https://groups.google.com/forum/#!topic/confluent-
> platform/A14dkPlDlv4
> > > >
> > > > Do you still see missing data?
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 5/11/17 2:39 AM, Mahendra Kariya wrote:
> > > > > Hi Matthias,
> > > > >
> > > > > We faced the issue again. The logs are below.
> > > > >
> > > > > 16:13:16.527 [StreamThread-7] INFO o.a.k.c.c.i.AbstractCoordinator
> -
> > > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > > <(214)%20748-3642> rack: null) dead
> > > > for
> > > > > group grp_id
> > > > > 16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.AbstractCoordinator
> -
> > > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > > <(214)%20748-3642> rack: null) for
> > > > group
> > > > > grp_id.
> > > > > 16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.AbstractCoordinator
> -
> > > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > > <(214)%20748-3642> rack: null) dead
> > > > for
> > > > > group grp_id
> > > > > 16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.AbstractCoordinator
> -
> > > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > > <(214)%20748-3642> rack: null) for
> > > > group
> > > > > grp_id.
> > > > > 16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.AbstractCoordinator
> -
> > > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > > <(214)%20748-3642> rack: null) dead
> > > > for
> > > > > group grp_id
> > > > > 16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator
> -
> > > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > > <(214)%20748-3642> rack: null) for
> > > > group
> > > > > grp_id.
> > > > > 16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator
> -
> > > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > > <(214)%20748-3642> rack: null) dead
> > > > for
> > > > > group grp_id
> > > > > 16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.AbstractCoordinator
> -
> > > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > > <(214)%20748-3642> rack: null) for
> > > > group
> > > > > grp_id.
> > > > > 16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.AbstractCoordinator
> -
> > > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > > <(214)%20748-3642> rack: null) dead
> > > > for
> > > > > group grp_id
> > > > > 16:13:16.573 [StreamThread-2] INFO o.a.k.c.c.i.AbstractCoordinator
> -
> > > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > > <(214)%20748-3642> rack: null) for
> > > > group
> > > > > grp_id.
> > > > >
> > > > >
> > > > >
> > > > > On Tue, May 9, 2017 at 3:40 AM, Matthias J. Sax <
> > matthias@confluent.io
> > > >
> > > > > wrote:
> > > > >
> > > > >> Great! Glad 0.10.2.1 fixes it for you!
> > > > >>
> > > > >> -Matthias
> > > > >>
> > > > >> On 5/7/17 8:57 PM, Mahendra Kariya wrote:
> > > > >>> Upgrading to 0.10.2.1 seems to have fixed the issue.
> > > > >>>
> > > > >>> Until now, we were looking at random 1 hour data to analyse the
> > > issue.
> > > > >> Over
> > > > >>> the weekend, we have written a simple test that will continuously
> > > check
> > > > >> for
> > > > >>> inconsistencies in real time and report if there is any issue.
> > > > >>>
> > > > >>> No issues have been reported for the last 24 hours. Will update
> > this
> > > > >> thread
> > > > >>> if we find any issue.
> > > > >>>
> > > > >>> Thanks for all the support!
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> On Fri, May 5, 2017 at 3:55 AM, Matthias J. Sax <
> > > matthias@confluent.io
> > > > >
> > > > >>> wrote:
> > > > >>>
> > > > >>>> About
> > > > >>>>
> > > > >>>>> 07:44:08.493 [StreamThread-10] INFO
> > o.a.k.c.c.i.AbstractCoordinato
> > > r
> > > > -
> > > > >>>>> Discovered coordinator broker-05:6667 for group group-2.
> > > > >>>>
> > > > >>>> Please upgrade to Streams 0.10.2.1 -- we fixed couple of bug
> and I
> > > > would
> > > > >>>> assume this issue is fixed, too. If not, please report back.
> > > > >>>>
> > > > >>>>> Another question that I have is, is there a way for us detect
> how
> > > > many
> > > > >>>>> messages have come out of order? And if possible, what is the
> > > delay?
> > > > >>>>
> > > > >>>> There is no metric or api for this. What you could do though is,
> > to
> > > > use
> > > > >>>> #transform() that only forwards each record and as a side task,
> > > > extracts
> > > > >>>> the timestamp via `context#timestamp()` and does some book
> keeping
> > > to
> > > > >>>> compute if out-of-order and what the delay was.
> > > > >>>>
> > > > >>>>
> > > > >>>>>>>  - same for .mapValues()
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>> I am not sure how to check this.
> > > > >>>>
> > > > >>>> The same way as you do for filter()?
> > > > >>>>
> > > > >>>>
> > > > >>>> -Matthias
> > > > >>>>
> > > > >>>>
> > > > >>>> On 5/4/17 10:29 AM, Mahendra Kariya wrote:
> > > > >>>>> Hi Matthias,
> > > > >>>>>
> > > > >>>>> Please find the answers below.
> > > > >>>>>
> > > > >>>>> I would recommend to double check the following:
> > > > >>>>>>
> > > > >>>>>>  - can you confirm that the filter does not remove all data
> for
> > > > those
> > > > >>>>>> time periods?
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>> Filter does not remove all data. There is a lot of data coming
> in
> > > > even
> > > > >>>>> after the filter stage.
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>>  - I would also check input for your AggregatorFunction() --
> > does
> > > it
> > > > >>>>>> receive everything?
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>> Yes. Aggregate function seems to be receiving everything.
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>>  - same for .mapValues()
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>> I am not sure how to check this.
> > > > >>>>>
> > > > >>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: Debugging Kafka Streams Windowing

Posted by Mahendra Kariya <ma...@go-jek.com>.
Thanks for the reply Guozhang! But I think we are talking of 2 different
issues here. KAFKA-5167 is for LockException. We face this issue
intermittently, but not a lot.

There is also another issue where a particular broker is marked as dead for
a group id and Streams process never recovers from this exception.

On Mon, May 15, 2017 at 11:28 PM, Guozhang Wang <wa...@gmail.com> wrote:

> I'm wondering if it is possibly due to KAFKA-5167? In that case, the "other
> thread" will keep retrying on grabbing the lock.
>
> Guozhang
>
>
> On Sat, May 13, 2017 at 7:30 PM, Mahendra Kariya <
> mahendra.kariya@go-jek.com
> > wrote:
>
> > Hi,
> >
> > There is no missing data. But the INFO level logs are infinite and the
> > streams practically stops. For the messages that I posted, we got these
> > INFO logs for around 20 mins. After which we got an alert about no data
> > being produced in the sink topic and we had to restart the streams
> > processes.
> >
> >
> >
> > On Sun, May 14, 2017 at 1:01 AM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> > > Hi,
> > >
> > > I just dug a little bit. The messages are logged at INFO level and thus
> > > should not be a problem if they go away by themselves after some time.
> > > Compare:
> > > https://groups.google.com/forum/#!topic/confluent-platform/A14dkPlDlv4
> > >
> > > Do you still see missing data?
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 5/11/17 2:39 AM, Mahendra Kariya wrote:
> > > > Hi Matthias,
> > > >
> > > > We faced the issue again. The logs are below.
> > > >
> > > > 16:13:16.527 [StreamThread-7] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > <(214)%20748-3642> rack: null) dead
> > > for
> > > > group grp_id
> > > > 16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > <(214)%20748-3642> rack: null) for
> > > group
> > > > grp_id.
> > > > 16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > <(214)%20748-3642> rack: null) dead
> > > for
> > > > group grp_id
> > > > 16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > <(214)%20748-3642> rack: null) for
> > > group
> > > > grp_id.
> > > > 16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > <(214)%20748-3642> rack: null) dead
> > > for
> > > > group grp_id
> > > > 16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > <(214)%20748-3642> rack: null) for
> > > group
> > > > grp_id.
> > > > 16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > <(214)%20748-3642> rack: null) dead
> > > for
> > > > group grp_id
> > > > 16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > <(214)%20748-3642> rack: null) for
> > > group
> > > > grp_id.
> > > > 16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > <(214)%20748-3642> rack: null) dead
> > > for
> > > > group grp_id
> > > > 16:13:16.573 [StreamThread-2] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > <(214)%20748-3642> rack: null) for
> > > group
> > > > grp_id.
> > > >
> > > >
> > > >
> > > > On Tue, May 9, 2017 at 3:40 AM, Matthias J. Sax <
> matthias@confluent.io
> > >
> > > > wrote:
> > > >
> > > >> Great! Glad 0.10.2.1 fixes it for you!
> > > >>
> > > >> -Matthias
> > > >>
> > > >> On 5/7/17 8:57 PM, Mahendra Kariya wrote:
> > > >>> Upgrading to 0.10.2.1 seems to have fixed the issue.
> > > >>>
> > > >>> Until now, we were looking at random 1 hour data to analyse the
> > issue.
> > > >> Over
> > > >>> the weekend, we have written a simple test that will continuously
> > check
> > > >> for
> > > >>> inconsistencies in real time and report if there is any issue.
> > > >>>
> > > >>> No issues have been reported for the last 24 hours. Will update
> this
> > > >> thread
> > > >>> if we find any issue.
> > > >>>
> > > >>> Thanks for all the support!
> > > >>>
> > > >>>
> > > >>>
> > > >>> On Fri, May 5, 2017 at 3:55 AM, Matthias J. Sax <
> > matthias@confluent.io
> > > >
> > > >>> wrote:
> > > >>>
> > > >>>> About
> > > >>>>
> > > >>>>> 07:44:08.493 [StreamThread-10] INFO
> o.a.k.c.c.i.AbstractCoordinato
> > r
> > > -
> > > >>>>> Discovered coordinator broker-05:6667 for group group-2.
> > > >>>>
> > > >>>> Please upgrade to Streams 0.10.2.1 -- we fixed couple of bug and I
> > > would
> > > >>>> assume this issue is fixed, too. If not, please report back.
> > > >>>>
> > > >>>>> Another question that I have is, is there a way for us detect how
> > > many
> > > >>>>> messages have come out of order? And if possible, what is the
> > delay?
> > > >>>>
> > > >>>> There is no metric or api for this. What you could do though is,
> to
> > > use
> > > >>>> #transform() that only forwards each record and as a side task,
> > > extracts
> > > >>>> the timestamp via `context#timestamp()` and does some book keeping
> > to
> > > >>>> compute if out-of-order and what the delay was.
> > > >>>>
> > > >>>>
> > > >>>>>>>  - same for .mapValues()
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>> I am not sure how to check this.
> > > >>>>
> > > >>>> The same way as you do for filter()?
> > > >>>>
> > > >>>>
> > > >>>> -Matthias
> > > >>>>
> > > >>>>
> > > >>>> On 5/4/17 10:29 AM, Mahendra Kariya wrote:
> > > >>>>> Hi Matthias,
> > > >>>>>
> > > >>>>> Please find the answers below.
> > > >>>>>
> > > >>>>> I would recommend to double check the following:
> > > >>>>>>
> > > >>>>>>  - can you confirm that the filter does not remove all data for
> > > those
> > > >>>>>> time periods?
> > > >>>>>>
> > > >>>>>
> > > >>>>> Filter does not remove all data. There is a lot of data coming in
> > > even
> > > >>>>> after the filter stage.
> > > >>>>>
> > > >>>>>
> > > >>>>>>  - I would also check input for your AggregatorFunction() --
> does
> > it
> > > >>>>>> receive everything?
> > > >>>>>>
> > > >>>>>
> > > >>>>> Yes. Aggregate function seems to be receiving everything.
> > > >>>>>
> > > >>>>>
> > > >>>>>>  - same for .mapValues()
> > > >>>>>>
> > > >>>>>
> > > >>>>> I am not sure how to check this.
> > > >>>>>
> > > >>>>
> > > >>>>
> > > >>>
> > > >>
> > > >>
> > > >
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Debugging Kafka Streams Windowing

Posted by Guozhang Wang <wa...@gmail.com>.
I'm wondering if it is possibly due to KAFKA-5167? In that case, the "other
thread" will keep retrying on grabbing the lock.

Guozhang


On Sat, May 13, 2017 at 7:30 PM, Mahendra Kariya <mahendra.kariya@go-jek.com
> wrote:

> Hi,
>
> There is no missing data. But the INFO level logs are infinite and the
> streams practically stops. For the messages that I posted, we got these
> INFO logs for around 20 mins. After which we got an alert about no data
> being produced in the sink topic and we had to restart the streams
> processes.
>
>
>
> On Sun, May 14, 2017 at 1:01 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > Hi,
> >
> > I just dug a little bit. The messages are logged at INFO level and thus
> > should not be a problem if they go away by themselves after some time.
> > Compare:
> > https://groups.google.com/forum/#!topic/confluent-platform/A14dkPlDlv4
> >
> > Do you still see missing data?
> >
> >
> > -Matthias
> >
> >
> > On 5/11/17 2:39 AM, Mahendra Kariya wrote:
> > > Hi Matthias,
> > >
> > > We faced the issue again. The logs are below.
> > >
> > > 16:13:16.527 [StreamThread-7] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > Marking the coordinator broker-05:6667 (id: 2147483642
> <(214)%20748-3642> rack: null) dead
> > for
> > > group grp_id
> > > 16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > Discovered coordinator broker-05:6667 (id: 2147483642
> <(214)%20748-3642> rack: null) for
> > group
> > > grp_id.
> > > 16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > Marking the coordinator broker-05:6667 (id: 2147483642
> <(214)%20748-3642> rack: null) dead
> > for
> > > group grp_id
> > > 16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > Discovered coordinator broker-05:6667 (id: 2147483642
> <(214)%20748-3642> rack: null) for
> > group
> > > grp_id.
> > > 16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > Marking the coordinator broker-05:6667 (id: 2147483642
> <(214)%20748-3642> rack: null) dead
> > for
> > > group grp_id
> > > 16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > Discovered coordinator broker-05:6667 (id: 2147483642
> <(214)%20748-3642> rack: null) for
> > group
> > > grp_id.
> > > 16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > Marking the coordinator broker-05:6667 (id: 2147483642
> <(214)%20748-3642> rack: null) dead
> > for
> > > group grp_id
> > > 16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > Discovered coordinator broker-05:6667 (id: 2147483642
> <(214)%20748-3642> rack: null) for
> > group
> > > grp_id.
> > > 16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > Marking the coordinator broker-05:6667 (id: 2147483642
> <(214)%20748-3642> rack: null) dead
> > for
> > > group grp_id
> > > 16:13:16.573 [StreamThread-2] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > Discovered coordinator broker-05:6667 (id: 2147483642
> <(214)%20748-3642> rack: null) for
> > group
> > > grp_id.
> > >
> > >
> > >
> > > On Tue, May 9, 2017 at 3:40 AM, Matthias J. Sax <matthias@confluent.io
> >
> > > wrote:
> > >
> > >> Great! Glad 0.10.2.1 fixes it for you!
> > >>
> > >> -Matthias
> > >>
> > >> On 5/7/17 8:57 PM, Mahendra Kariya wrote:
> > >>> Upgrading to 0.10.2.1 seems to have fixed the issue.
> > >>>
> > >>> Until now, we were looking at random 1 hour data to analyse the
> issue.
> > >> Over
> > >>> the weekend, we have written a simple test that will continuously
> check
> > >> for
> > >>> inconsistencies in real time and report if there is any issue.
> > >>>
> > >>> No issues have been reported for the last 24 hours. Will update this
> > >> thread
> > >>> if we find any issue.
> > >>>
> > >>> Thanks for all the support!
> > >>>
> > >>>
> > >>>
> > >>> On Fri, May 5, 2017 at 3:55 AM, Matthias J. Sax <
> matthias@confluent.io
> > >
> > >>> wrote:
> > >>>
> > >>>> About
> > >>>>
> > >>>>> 07:44:08.493 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinato
> r
> > -
> > >>>>> Discovered coordinator broker-05:6667 for group group-2.
> > >>>>
> > >>>> Please upgrade to Streams 0.10.2.1 -- we fixed couple of bug and I
> > would
> > >>>> assume this issue is fixed, too. If not, please report back.
> > >>>>
> > >>>>> Another question that I have is, is there a way for us detect how
> > many
> > >>>>> messages have come out of order? And if possible, what is the
> delay?
> > >>>>
> > >>>> There is no metric or api for this. What you could do though is, to
> > use
> > >>>> #transform() that only forwards each record and as a side task,
> > extracts
> > >>>> the timestamp via `context#timestamp()` and does some book keeping
> to
> > >>>> compute if out-of-order and what the delay was.
> > >>>>
> > >>>>
> > >>>>>>>  - same for .mapValues()
> > >>>>>>>
> > >>>>>>
> > >>>>>> I am not sure how to check this.
> > >>>>
> > >>>> The same way as you do for filter()?
> > >>>>
> > >>>>
> > >>>> -Matthias
> > >>>>
> > >>>>
> > >>>> On 5/4/17 10:29 AM, Mahendra Kariya wrote:
> > >>>>> Hi Matthias,
> > >>>>>
> > >>>>> Please find the answers below.
> > >>>>>
> > >>>>> I would recommend to double check the following:
> > >>>>>>
> > >>>>>>  - can you confirm that the filter does not remove all data for
> > those
> > >>>>>> time periods?
> > >>>>>>
> > >>>>>
> > >>>>> Filter does not remove all data. There is a lot of data coming in
> > even
> > >>>>> after the filter stage.
> > >>>>>
> > >>>>>
> > >>>>>>  - I would also check input for your AggregatorFunction() -- does
> it
> > >>>>>> receive everything?
> > >>>>>>
> > >>>>>
> > >>>>> Yes. Aggregate function seems to be receiving everything.
> > >>>>>
> > >>>>>
> > >>>>>>  - same for .mapValues()
> > >>>>>>
> > >>>>>
> > >>>>> I am not sure how to check this.
> > >>>>>
> > >>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >
> >
> >
>



-- 
-- Guozhang

Re: Debugging Kafka Streams Windowing

Posted by Mahendra Kariya <ma...@go-jek.com>.
Hi,

There is no missing data. But the INFO level logs are infinite and the
streams practically stops. For the messages that I posted, we got these
INFO logs for around 20 mins. After which we got an alert about no data
being produced in the sink topic and we had to restart the streams
processes.



On Sun, May 14, 2017 at 1:01 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Hi,
>
> I just dug a little bit. The messages are logged at INFO level and thus
> should not be a problem if they go away by themselves after some time.
> Compare:
> https://groups.google.com/forum/#!topic/confluent-platform/A14dkPlDlv4
>
> Do you still see missing data?
>
>
> -Matthias
>
>
> On 5/11/17 2:39 AM, Mahendra Kariya wrote:
> > Hi Matthias,
> >
> > We faced the issue again. The logs are below.
> >
> > 16:13:16.527 [StreamThread-7] INFO o.a.k.c.c.i.AbstractCoordinator -
> > Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead
> for
> > group grp_id
> > 16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.AbstractCoordinator -
> > Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for
> group
> > grp_id.
> > 16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.AbstractCoordinator -
> > Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead
> for
> > group grp_id
> > 16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.AbstractCoordinator -
> > Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for
> group
> > grp_id.
> > 16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.AbstractCoordinator -
> > Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead
> for
> > group grp_id
> > 16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator -
> > Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for
> group
> > grp_id.
> > 16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator -
> > Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead
> for
> > group grp_id
> > 16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.AbstractCoordinator -
> > Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for
> group
> > grp_id.
> > 16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.AbstractCoordinator -
> > Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead
> for
> > group grp_id
> > 16:13:16.573 [StreamThread-2] INFO o.a.k.c.c.i.AbstractCoordinator -
> > Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for
> group
> > grp_id.
> >
> >
> >
> > On Tue, May 9, 2017 at 3:40 AM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> >> Great! Glad 0.10.2.1 fixes it for you!
> >>
> >> -Matthias
> >>
> >> On 5/7/17 8:57 PM, Mahendra Kariya wrote:
> >>> Upgrading to 0.10.2.1 seems to have fixed the issue.
> >>>
> >>> Until now, we were looking at random 1 hour data to analyse the issue.
> >> Over
> >>> the weekend, we have written a simple test that will continuously check
> >> for
> >>> inconsistencies in real time and report if there is any issue.
> >>>
> >>> No issues have been reported for the last 24 hours. Will update this
> >> thread
> >>> if we find any issue.
> >>>
> >>> Thanks for all the support!
> >>>
> >>>
> >>>
> >>> On Fri, May 5, 2017 at 3:55 AM, Matthias J. Sax <matthias@confluent.io
> >
> >>> wrote:
> >>>
> >>>> About
> >>>>
> >>>>> 07:44:08.493 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator
> -
> >>>>> Discovered coordinator broker-05:6667 for group group-2.
> >>>>
> >>>> Please upgrade to Streams 0.10.2.1 -- we fixed couple of bug and I
> would
> >>>> assume this issue is fixed, too. If not, please report back.
> >>>>
> >>>>> Another question that I have is, is there a way for us detect how
> many
> >>>>> messages have come out of order? And if possible, what is the delay?
> >>>>
> >>>> There is no metric or api for this. What you could do though is, to
> use
> >>>> #transform() that only forwards each record and as a side task,
> extracts
> >>>> the timestamp via `context#timestamp()` and does some book keeping to
> >>>> compute if out-of-order and what the delay was.
> >>>>
> >>>>
> >>>>>>>  - same for .mapValues()
> >>>>>>>
> >>>>>>
> >>>>>> I am not sure how to check this.
> >>>>
> >>>> The same way as you do for filter()?
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 5/4/17 10:29 AM, Mahendra Kariya wrote:
> >>>>> Hi Matthias,
> >>>>>
> >>>>> Please find the answers below.
> >>>>>
> >>>>> I would recommend to double check the following:
> >>>>>>
> >>>>>>  - can you confirm that the filter does not remove all data for
> those
> >>>>>> time periods?
> >>>>>>
> >>>>>
> >>>>> Filter does not remove all data. There is a lot of data coming in
> even
> >>>>> after the filter stage.
> >>>>>
> >>>>>
> >>>>>>  - I would also check input for your AggregatorFunction() -- does it
> >>>>>> receive everything?
> >>>>>>
> >>>>>
> >>>>> Yes. Aggregate function seems to be receiving everything.
> >>>>>
> >>>>>
> >>>>>>  - same for .mapValues()
> >>>>>>
> >>>>>
> >>>>> I am not sure how to check this.
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Re: Debugging Kafka Streams Windowing

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Hi,

I just dug a little bit. The messages are logged at INFO level and thus
should not be a problem if they go away by themselves after some time.
Compare:
https://groups.google.com/forum/#!topic/confluent-platform/A14dkPlDlv4

Do you still see missing data?


-Matthias


On 5/11/17 2:39 AM, Mahendra Kariya wrote:
> Hi Matthias,
> 
> We faced the issue again. The logs are below.
> 
> 16:13:16.527 [StreamThread-7] INFO o.a.k.c.c.i.AbstractCoordinator -
> Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for
> group grp_id
> 16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.AbstractCoordinator -
> Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group
> grp_id.
> 16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.AbstractCoordinator -
> Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for
> group grp_id
> 16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.AbstractCoordinator -
> Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group
> grp_id.
> 16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.AbstractCoordinator -
> Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for
> group grp_id
> 16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator -
> Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group
> grp_id.
> 16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator -
> Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for
> group grp_id
> 16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.AbstractCoordinator -
> Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group
> grp_id.
> 16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.AbstractCoordinator -
> Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for
> group grp_id
> 16:13:16.573 [StreamThread-2] INFO o.a.k.c.c.i.AbstractCoordinator -
> Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group
> grp_id.
> 
> 
> 
> On Tue, May 9, 2017 at 3:40 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> Great! Glad 0.10.2.1 fixes it for you!
>>
>> -Matthias
>>
>> On 5/7/17 8:57 PM, Mahendra Kariya wrote:
>>> Upgrading to 0.10.2.1 seems to have fixed the issue.
>>>
>>> Until now, we were looking at random 1 hour data to analyse the issue.
>> Over
>>> the weekend, we have written a simple test that will continuously check
>> for
>>> inconsistencies in real time and report if there is any issue.
>>>
>>> No issues have been reported for the last 24 hours. Will update this
>> thread
>>> if we find any issue.
>>>
>>> Thanks for all the support!
>>>
>>>
>>>
>>> On Fri, May 5, 2017 at 3:55 AM, Matthias J. Sax <ma...@confluent.io>
>>> wrote:
>>>
>>>> About
>>>>
>>>>> 07:44:08.493 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
>>>>> Discovered coordinator broker-05:6667 for group group-2.
>>>>
>>>> Please upgrade to Streams 0.10.2.1 -- we fixed couple of bug and I would
>>>> assume this issue is fixed, too. If not, please report back.
>>>>
>>>>> Another question that I have is, is there a way for us detect how many
>>>>> messages have come out of order? And if possible, what is the delay?
>>>>
>>>> There is no metric or api for this. What you could do though is, to use
>>>> #transform() that only forwards each record and as a side task, extracts
>>>> the timestamp via `context#timestamp()` and does some book keeping to
>>>> compute if out-of-order and what the delay was.
>>>>
>>>>
>>>>>>>  - same for .mapValues()
>>>>>>>
>>>>>>
>>>>>> I am not sure how to check this.
>>>>
>>>> The same way as you do for filter()?
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 5/4/17 10:29 AM, Mahendra Kariya wrote:
>>>>> Hi Matthias,
>>>>>
>>>>> Please find the answers below.
>>>>>
>>>>> I would recommend to double check the following:
>>>>>>
>>>>>>  - can you confirm that the filter does not remove all data for those
>>>>>> time periods?
>>>>>>
>>>>>
>>>>> Filter does not remove all data. There is a lot of data coming in even
>>>>> after the filter stage.
>>>>>
>>>>>
>>>>>>  - I would also check input for your AggregatorFunction() -- does it
>>>>>> receive everything?
>>>>>>
>>>>>
>>>>> Yes. Aggregate function seems to be receiving everything.
>>>>>
>>>>>
>>>>>>  - same for .mapValues()
>>>>>>
>>>>>
>>>>> I am not sure how to check this.
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Re: Debugging Kafka Streams Windowing

Posted by Mahendra Kariya <ma...@go-jek.com>.
Hi Matthias,

We faced the issue again. The logs are below.

16:13:16.527 [StreamThread-7] INFO o.a.k.c.c.i.AbstractCoordinator -
Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for
group grp_id
16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group
grp_id.
16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.AbstractCoordinator -
Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for
group grp_id
16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group
grp_id.
16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.AbstractCoordinator -
Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for
group grp_id
16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group
grp_id.
16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator -
Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for
group grp_id
16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group
grp_id.
16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.AbstractCoordinator -
Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for
group grp_id
16:13:16.573 [StreamThread-2] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group
grp_id.



On Tue, May 9, 2017 at 3:40 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Great! Glad 0.10.2.1 fixes it for you!
>
> -Matthias
>
> On 5/7/17 8:57 PM, Mahendra Kariya wrote:
> > Upgrading to 0.10.2.1 seems to have fixed the issue.
> >
> > Until now, we were looking at random 1 hour data to analyse the issue.
> Over
> > the weekend, we have written a simple test that will continuously check
> for
> > inconsistencies in real time and report if there is any issue.
> >
> > No issues have been reported for the last 24 hours. Will update this
> thread
> > if we find any issue.
> >
> > Thanks for all the support!
> >
> >
> >
> > On Fri, May 5, 2017 at 3:55 AM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> >> About
> >>
> >>> 07:44:08.493 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
> >>> Discovered coordinator broker-05:6667 for group group-2.
> >>
> >> Please upgrade to Streams 0.10.2.1 -- we fixed couple of bug and I would
> >> assume this issue is fixed, too. If not, please report back.
> >>
> >>> Another question that I have is, is there a way for us detect how many
> >>> messages have come out of order? And if possible, what is the delay?
> >>
> >> There is no metric or api for this. What you could do though is, to use
> >> #transform() that only forwards each record and as a side task, extracts
> >> the timestamp via `context#timestamp()` and does some book keeping to
> >> compute if out-of-order and what the delay was.
> >>
> >>
> >>>>>  - same for .mapValues()
> >>>>>
> >>>>
> >>>> I am not sure how to check this.
> >>
> >> The same way as you do for filter()?
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 5/4/17 10:29 AM, Mahendra Kariya wrote:
> >>> Hi Matthias,
> >>>
> >>> Please find the answers below.
> >>>
> >>> I would recommend to double check the following:
> >>>>
> >>>>  - can you confirm that the filter does not remove all data for those
> >>>> time periods?
> >>>>
> >>>
> >>> Filter does not remove all data. There is a lot of data coming in even
> >>> after the filter stage.
> >>>
> >>>
> >>>>  - I would also check input for your AggregatorFunction() -- does it
> >>>> receive everything?
> >>>>
> >>>
> >>> Yes. Aggregate function seems to be receiving everything.
> >>>
> >>>
> >>>>  - same for .mapValues()
> >>>>
> >>>
> >>> I am not sure how to check this.
> >>>
> >>
> >>
> >
>
>

Re: Debugging Kafka Streams Windowing

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Great! Glad 0.10.2.1 fixes it for you!

-Matthias

On 5/7/17 8:57 PM, Mahendra Kariya wrote:
> Upgrading to 0.10.2.1 seems to have fixed the issue.
> 
> Until now, we were looking at random 1 hour data to analyse the issue. Over
> the weekend, we have written a simple test that will continuously check for
> inconsistencies in real time and report if there is any issue.
> 
> No issues have been reported for the last 24 hours. Will update this thread
> if we find any issue.
> 
> Thanks for all the support!
> 
> 
> 
> On Fri, May 5, 2017 at 3:55 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> About
>>
>>> 07:44:08.493 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
>>> Discovered coordinator broker-05:6667 for group group-2.
>>
>> Please upgrade to Streams 0.10.2.1 -- we fixed couple of bug and I would
>> assume this issue is fixed, too. If not, please report back.
>>
>>> Another question that I have is, is there a way for us detect how many
>>> messages have come out of order? And if possible, what is the delay?
>>
>> There is no metric or api for this. What you could do though is, to use
>> #transform() that only forwards each record and as a side task, extracts
>> the timestamp via `context#timestamp()` and does some book keeping to
>> compute if out-of-order and what the delay was.
>>
>>
>>>>>  - same for .mapValues()
>>>>>
>>>>
>>>> I am not sure how to check this.
>>
>> The same way as you do for filter()?
>>
>>
>> -Matthias
>>
>>
>> On 5/4/17 10:29 AM, Mahendra Kariya wrote:
>>> Hi Matthias,
>>>
>>> Please find the answers below.
>>>
>>> I would recommend to double check the following:
>>>>
>>>>  - can you confirm that the filter does not remove all data for those
>>>> time periods?
>>>>
>>>
>>> Filter does not remove all data. There is a lot of data coming in even
>>> after the filter stage.
>>>
>>>
>>>>  - I would also check input for your AggregatorFunction() -- does it
>>>> receive everything?
>>>>
>>>
>>> Yes. Aggregate function seems to be receiving everything.
>>>
>>>
>>>>  - same for .mapValues()
>>>>
>>>
>>> I am not sure how to check this.
>>>
>>
>>
> 


Re: Debugging Kafka Streams Windowing

Posted by Mahendra Kariya <ma...@go-jek.com>.
Upgrading to 0.10.2.1 seems to have fixed the issue.

Until now, we were looking at random 1 hour data to analyse the issue. Over
the weekend, we have written a simple test that will continuously check for
inconsistencies in real time and report if there is any issue.

No issues have been reported for the last 24 hours. Will update this thread
if we find any issue.

Thanks for all the support!



On Fri, May 5, 2017 at 3:55 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> About
>
> > 07:44:08.493 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
> > Discovered coordinator broker-05:6667 for group group-2.
>
> Please upgrade to Streams 0.10.2.1 -- we fixed couple of bug and I would
> assume this issue is fixed, too. If not, please report back.
>
> > Another question that I have is, is there a way for us detect how many
> > messages have come out of order? And if possible, what is the delay?
>
> There is no metric or api for this. What you could do though is, to use
> #transform() that only forwards each record and as a side task, extracts
> the timestamp via `context#timestamp()` and does some book keeping to
> compute if out-of-order and what the delay was.
>
>
> >>>  - same for .mapValues()
> >>>
> >>
> >> I am not sure how to check this.
>
> The same way as you do for filter()?
>
>
> -Matthias
>
>
> On 5/4/17 10:29 AM, Mahendra Kariya wrote:
> > Hi Matthias,
> >
> > Please find the answers below.
> >
> > I would recommend to double check the following:
> >>
> >>  - can you confirm that the filter does not remove all data for those
> >> time periods?
> >>
> >
> > Filter does not remove all data. There is a lot of data coming in even
> > after the filter stage.
> >
> >
> >>  - I would also check input for your AggregatorFunction() -- does it
> >> receive everything?
> >>
> >
> > Yes. Aggregate function seems to be receiving everything.
> >
> >
> >>  - same for .mapValues()
> >>
> >
> > I am not sure how to check this.
> >
>
>

Re: Debugging Kafka Streams Windowing

Posted by "Matthias J. Sax" <ma...@confluent.io>.
About

> 07:44:08.493 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
> Discovered coordinator broker-05:6667 for group group-2.

Please upgrade to Streams 0.10.2.1 -- we fixed couple of bug and I would
assume this issue is fixed, too. If not, please report back.

> Another question that I have is, is there a way for us detect how many
> messages have come out of order? And if possible, what is the delay?

There is no metric or api for this. What you could do though is, to use
#transform() that only forwards each record and as a side task, extracts
the timestamp via `context#timestamp()` and does some book keeping to
compute if out-of-order and what the delay was.


>>>  - same for .mapValues()
>>>
>> 
>> I am not sure how to check this.

The same way as you do for filter()?


-Matthias


On 5/4/17 10:29 AM, Mahendra Kariya wrote:
> Hi Matthias,
> 
> Please find the answers below.
> 
> I would recommend to double check the following:
>>
>>  - can you confirm that the filter does not remove all data for those
>> time periods?
>>
> 
> Filter does not remove all data. There is a lot of data coming in even
> after the filter stage.
> 
> 
>>  - I would also check input for your AggregatorFunction() -- does it
>> receive everything?
>>
> 
> Yes. Aggregate function seems to be receiving everything.
> 
> 
>>  - same for .mapValues()
>>
> 
> I am not sure how to check this.
> 


Re: Debugging Kafka Streams Windowing

Posted by Mahendra Kariya <ma...@go-jek.com>.
Another question that I have is, is there a way for us detect how many
messages have come out of order? And if possible, what is the delay?

On Thu, May 4, 2017 at 6:17 AM, Mahendra Kariya <ma...@go-jek.com>
wrote:

> Hi Matthias,
>
> Sure we will look into this. In the meantime, we have run into another
> issue. We have started getting this error frequently rather frequently and
> the Streams app is unable to recover from this.
>
> 07:44:08.493 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
> Discovered coordinator broker-05:6667 for group group-2.
> 07:44:08.494 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
> Marking the coordinator broker-05:6667 dead for group group-2
> 07:44:08.594 [StreamThread-9] INFO o.a.k.c.c.i.AbstractCoordinator -
> Discovered coordinator broker-05:6667 for group group-2.
> 07:44:08.594 [StreamThread-9] INFO o.a.k.c.c.i.AbstractCoordinator -
> Marking the coordinator broker-05:6667 dead for group group-2
> 07:44:08.594 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
> Discovered coordinator broker-05:6667 for group group-2.
> 07:44:08.594 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
> Marking the coordinator broker-05:6667 dead for group group-2
> 07:44:08.694 [StreamThread-9] INFO o.a.k.c.c.i.AbstractCoordinator -
> Discovered coordinator broker-05:6667 for group group-2.
>
>
> On Thu, May 4, 2017 at 4:35 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
>> I would recommend to double check the following:
>>
>>  - can you confirm that the filter does not remove all data for those
>> time periods?
>>  - I would also check input for your AggregatorFunction() -- does it
>> receive everything?
>>  - same for .mapValues()
>>
>> This would help to understand in what part of the program the data gets
>> lost.
>>
>>
>> -Matthias
>>
>>
>> On 5/2/17 11:09 PM, Mahendra Kariya wrote:
>> > Hi Garrett,
>> >
>> > Thanks for these insights. But we are not consuming old data. We want
>> the
>> > Streams app to run in near real time. And that is how it is actually
>> > running. The lag never increases beyond a certain limit. So I don't
>> think
>> > that's an issue.
>> >
>> > The values of the configs that you are mentioning are whatever Kafka
>> offers
>> > by default. So I guess that should be fine.
>> >
>> >
>> >
>> > On Tue, May 2, 2017 at 7:52 PM, Garrett Barton <
>> garrett.barton@gmail.com>
>> > wrote:
>> >
>> >> Mahendra,
>> >>
>> >>  One possible thing I have seen that exhibits the same behavior of
>> missing
>> >> windows of data is the configuration of the topics (internal and your
>> own)
>> >> retention policies.  I was loading data that was fairly old (weeks) and
>> >> using event time semantics as the record timestamp (custom timestamp
>> >> extractor) and the cleanup stuff was deleting segments nearly right
>> after
>> >> they were written.  In my case default cleanup run was every 5
>> minutes, and
>> >> the default retention was 7 days, so every 5 minutes I lost data.  In
>> my
>> >> logs I saw a ton of warnings about 'offset not found' and kafka
>> skipping
>> >> ahead to whatever the next available offset was.  End result was gaps
>> all
>> >> over my data.  I don't have a good fix yet, I set the retention to
>> >> something massive which I think is getting me other problems.
>> >>
>> >> Maybe that helps?
>> >>
>> >> On Tue, May 2, 2017 at 6:27 AM, Mahendra Kariya <
>> >> mahendra.kariya@go-jek.com>
>> >> wrote:
>> >>
>> >>> Hi Matthias,
>> >>>
>> >>> What we did was read the data from sink topic and print it to console.
>> >> And
>> >>> here's the raw data from that topic (the counts are randomized). As we
>> >> can
>> >>> see, the data is certainly missing for some time windows. For
>> instance,
>> >>> after 1493693760, the next timestamp for which the data is present
>> >>> is 1493694300. That's around 9 minutes of data missing.
>> >>>
>> >>> And this is just one instance. There are a lot of such instances in
>> this
>> >>> file.
>> >>>
>> >>>
>> >>>
>> >>> On Sun, Apr 30, 2017 at 11:23 AM, Mahendra Kariya <
>> >>> mahendra.kariya@go-jek.com> wrote:
>> >>>
>> >>>> Thanks for the update Matthias! And sorry for the delayed response.
>> >>>>
>> >>>> The reason we use .aggregate() is because we want to count the
>> number of
>> >>>> unique values for a particular field in the message. So, we just add
>> >> that
>> >>>> particular field's value in the HashSet and then take the size of the
>> >>>> HashSet.
>> >>>>
>> >>>> On our side, we are also investigating and it looks like there might
>> be
>> >> a
>> >>>> bug somewhere in our codebase. If that's the case, then it's quite
>> >> possible
>> >>>> that there is no bug in Kafka Streams, except the metric one.
>> >>>>
>> >>>> We will revert after confirming.
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>> On Sun, Apr 30, 2017 at 10:39 AM, Matthias J. Sax <
>> >> matthias@confluent.io>
>> >>>> wrote:
>> >>>>
>> >>>>> Just a follow up (we identified a bug in the "skipped records"
>> metric).
>> >>>>> The reported value is not correct.
>> >>>>>
>> >>>>>
>> >>>>> On 4/28/17 9:12 PM, Matthias J. Sax wrote:
>> >>>>>> Ok. That makes sense.
>> >>>>>>
>> >>>>>> Question: why do you use .aggregate() instead of .count() ?
>> >>>>>>
>> >>>>>> Also, can you share the code of you AggregatorFunction()? Did you
>> >>>>> change
>> >>>>>> any default setting of StreamsConfig?
>> >>>>>>
>> >>>>>> I have still no idea what could go wrong. Maybe you can run with
>> log
>> >>>>>> level TRACE? Maybe we can get some insight from those.
>> >>>>>>
>> >>>>>>
>> >>>>>> -Matthias
>> >>>>>>
>> >>>>>> On 4/27/17 11:41 PM, Mahendra Kariya wrote:
>> >>>>>>> Oh good point!
>> >>>>>>>
>> >>>>>>> The reason why there is only one row corresponding to each time
>> >>>>> window is
>> >>>>>>> because it only contains the latest value for the time window. So
>> >>>>> what we
>> >>>>>>> did was we just dumped the data present in the sink topic to a db
>> >>>>> using an
>> >>>>>>> upsert query. The primary key of the table was time window. The
>> file
>> >>>>> that I
>> >>>>>>> attached is actually the data present in the DB. And we know that
>> >>>>> there is
>> >>>>>>> no bug in our db dump code because we have been using it for a
>> long
>> >>>>> time in
>> >>>>>>> production without any issues.
>> >>>>>>>
>> >>>>>>> The reason the count is zero for some time windows is because I
>> >>>>> subtracted
>> >>>>>>> a random number the actual values and rounded it off to zero; for
>> >>>>> privacy
>> >>>>>>> reason. The actual data doesn't have any zero values. I should
>> have
>> >>>>>>> mentioned this earlier. My bad!
>> >>>>>>>
>> >>>>>>> The stream topology code looks something like this.
>> >>>>>>>
>> >>>>>>> stream
>> >>>>>>>     .filter()
>> >>>>>>>     .map((key, value) -> new KeyValue<>(transform(key), value)
>> >>>>>>>     .groupByKey()
>> >>>>>>>     .aggregate(HashSet::new, AggregatorFunction(),
>> >>>>>>> TimeWindows.of(60000).until(3600000))
>> >>>>>>>     .mapValues(HashSet::size)
>> >>>>>>>     .toStream()
>> >>>>>>>     .map((key, value) -> convertToProtobufObject(key, value))
>> >>>>>>>     .to()
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On Fri, Apr 28, 2017 at 1:13 PM, Matthias J. Sax <
>> >>>>> matthias@confluent.io>
>> >>>>>>> wrote:
>> >>>>>>>
>> >>>>>>>> Thanks for the details (sorry that I forgot that you did share
>> the
>> >>>>>>>> output already).
>> >>>>>>>>
>> >>>>>>>> Might be a dumb question, but what is the count for missing
>> windows
>> >>>>> in
>> >>>>>>>> your seconds implementation?
>> >>>>>>>>
>> >>>>>>>> If there is no data for a window, it should not emit a window
>> with
>> >>>>> count
>> >>>>>>>> zero, but nothing.
>> >>>>>>>>
>> >>>>>>>> Thus, looking at your output, I am wondering how it could contain
>> >>>>> line
>> >>>>>>>> like:
>> >>>>>>>>
>> >>>>>>>>> 2017-04-27T04:53:00 0
>> >>>>>>>>
>> >>>>>>>> I am also wondering why your output only contains a single value
>> >> per
>> >>>>>>>> window. As Streams outputs multiple updates per window while the
>> >>>>> count
>> >>>>>>>> is increasing, you should actually see multiple records per
>> window.
>> >>>>>>>>
>> >>>>>>>> Your code is like this:
>> >>>>>>>>
>> >>>>>>>> stream.filter().groupByKey().count(TimeWindow.of(60000)).to();
>> >>>>>>>>
>> >>>>>>>> Or do you have something more complex?
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> -Matthias
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On 4/27/17 9:16 PM, Mahendra Kariya wrote:
>> >>>>>>>>>> Can you somehow verify your output?
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> Do you mean the Kafka streams output? In the Kafka Streams
>> output,
>> >>>>> we do
>> >>>>>>>>> see some missing values. I have attached the Kafka Streams
>> output
>> >>>>> (for a
>> >>>>>>>>> few hours) in the very first email of this thread for reference.
>> >>>>>>>>>
>> >>>>>>>>> Let me also summarise what we have done so far.
>> >>>>>>>>>
>> >>>>>>>>> We took a dump of the raw data present in the source topic. We
>> >>>>> wrote a
>> >>>>>>>>> script to read this data and do the exact same aggregations that
>> >> we
>> >>>>> do
>> >>>>>>>>> using Kafka Streams. And then we compared the output from Kafka
>> >>>>> Streams
>> >>>>>>>> and
>> >>>>>>>>> our script.
>> >>>>>>>>>
>> >>>>>>>>> The difference that we observed in the two outputs is that there
>> >>>>> were a
>> >>>>>>>> few
>> >>>>>>>>> rows (corresponding to some time windows) missing in the Streams
>> >>>>> output.
>> >>>>>>>>> For the time windows for which the data was present, the
>> >> aggregated
>> >>>>>>>> numbers
>> >>>>>>>>> matched exactly.
>> >>>>>>>>>
>> >>>>>>>>> This means, either all the records for a particular time window
>> >> are
>> >>>>> being
>> >>>>>>>>> skipped, or none. Now this is highly unlikely to happen. Maybe
>> >>>>> there is a
>> >>>>>>>>> bug somewhere in the rocksdb state stores? Just a speculation,
>> not
>> >>>>> sure
>> >>>>>>>>> though. And there could even be a bug in the reported metric.
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>>
>

Re: Debugging Kafka Streams Windowing

Posted by Mahendra Kariya <ma...@go-jek.com>.
Hi Matthias,

Sure we will look into this. In the meantime, we have run into another
issue. We have started getting this error frequently rather frequently and
the Streams app is unable to recover from this.

07:44:08.493 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 for group group-2.
07:44:08.494 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
Marking the coordinator broker-05:6667 dead for group group-2
07:44:08.594 [StreamThread-9] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 for group group-2.
07:44:08.594 [StreamThread-9] INFO o.a.k.c.c.i.AbstractCoordinator -
Marking the coordinator broker-05:6667 dead for group group-2
07:44:08.594 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 for group group-2.
07:44:08.594 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
Marking the coordinator broker-05:6667 dead for group group-2
07:44:08.694 [StreamThread-9] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 for group group-2.


On Thu, May 4, 2017 at 4:35 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> I would recommend to double check the following:
>
>  - can you confirm that the filter does not remove all data for those
> time periods?
>  - I would also check input for your AggregatorFunction() -- does it
> receive everything?
>  - same for .mapValues()
>
> This would help to understand in what part of the program the data gets
> lost.
>
>
> -Matthias
>
>
> On 5/2/17 11:09 PM, Mahendra Kariya wrote:
> > Hi Garrett,
> >
> > Thanks for these insights. But we are not consuming old data. We want the
> > Streams app to run in near real time. And that is how it is actually
> > running. The lag never increases beyond a certain limit. So I don't think
> > that's an issue.
> >
> > The values of the configs that you are mentioning are whatever Kafka
> offers
> > by default. So I guess that should be fine.
> >
> >
> >
> > On Tue, May 2, 2017 at 7:52 PM, Garrett Barton <garrett.barton@gmail.com
> >
> > wrote:
> >
> >> Mahendra,
> >>
> >>  One possible thing I have seen that exhibits the same behavior of
> missing
> >> windows of data is the configuration of the topics (internal and your
> own)
> >> retention policies.  I was loading data that was fairly old (weeks) and
> >> using event time semantics as the record timestamp (custom timestamp
> >> extractor) and the cleanup stuff was deleting segments nearly right
> after
> >> they were written.  In my case default cleanup run was every 5 minutes,
> and
> >> the default retention was 7 days, so every 5 minutes I lost data.  In my
> >> logs I saw a ton of warnings about 'offset not found' and kafka skipping
> >> ahead to whatever the next available offset was.  End result was gaps
> all
> >> over my data.  I don't have a good fix yet, I set the retention to
> >> something massive which I think is getting me other problems.
> >>
> >> Maybe that helps?
> >>
> >> On Tue, May 2, 2017 at 6:27 AM, Mahendra Kariya <
> >> mahendra.kariya@go-jek.com>
> >> wrote:
> >>
> >>> Hi Matthias,
> >>>
> >>> What we did was read the data from sink topic and print it to console.
> >> And
> >>> here's the raw data from that topic (the counts are randomized). As we
> >> can
> >>> see, the data is certainly missing for some time windows. For instance,
> >>> after 1493693760, the next timestamp for which the data is present
> >>> is 1493694300. That's around 9 minutes of data missing.
> >>>
> >>> And this is just one instance. There are a lot of such instances in
> this
> >>> file.
> >>>
> >>>
> >>>
> >>> On Sun, Apr 30, 2017 at 11:23 AM, Mahendra Kariya <
> >>> mahendra.kariya@go-jek.com> wrote:
> >>>
> >>>> Thanks for the update Matthias! And sorry for the delayed response.
> >>>>
> >>>> The reason we use .aggregate() is because we want to count the number
> of
> >>>> unique values for a particular field in the message. So, we just add
> >> that
> >>>> particular field's value in the HashSet and then take the size of the
> >>>> HashSet.
> >>>>
> >>>> On our side, we are also investigating and it looks like there might
> be
> >> a
> >>>> bug somewhere in our codebase. If that's the case, then it's quite
> >> possible
> >>>> that there is no bug in Kafka Streams, except the metric one.
> >>>>
> >>>> We will revert after confirming.
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Sun, Apr 30, 2017 at 10:39 AM, Matthias J. Sax <
> >> matthias@confluent.io>
> >>>> wrote:
> >>>>
> >>>>> Just a follow up (we identified a bug in the "skipped records"
> metric).
> >>>>> The reported value is not correct.
> >>>>>
> >>>>>
> >>>>> On 4/28/17 9:12 PM, Matthias J. Sax wrote:
> >>>>>> Ok. That makes sense.
> >>>>>>
> >>>>>> Question: why do you use .aggregate() instead of .count() ?
> >>>>>>
> >>>>>> Also, can you share the code of you AggregatorFunction()? Did you
> >>>>> change
> >>>>>> any default setting of StreamsConfig?
> >>>>>>
> >>>>>> I have still no idea what could go wrong. Maybe you can run with log
> >>>>>> level TRACE? Maybe we can get some insight from those.
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 4/27/17 11:41 PM, Mahendra Kariya wrote:
> >>>>>>> Oh good point!
> >>>>>>>
> >>>>>>> The reason why there is only one row corresponding to each time
> >>>>> window is
> >>>>>>> because it only contains the latest value for the time window. So
> >>>>> what we
> >>>>>>> did was we just dumped the data present in the sink topic to a db
> >>>>> using an
> >>>>>>> upsert query. The primary key of the table was time window. The
> file
> >>>>> that I
> >>>>>>> attached is actually the data present in the DB. And we know that
> >>>>> there is
> >>>>>>> no bug in our db dump code because we have been using it for a long
> >>>>> time in
> >>>>>>> production without any issues.
> >>>>>>>
> >>>>>>> The reason the count is zero for some time windows is because I
> >>>>> subtracted
> >>>>>>> a random number the actual values and rounded it off to zero; for
> >>>>> privacy
> >>>>>>> reason. The actual data doesn't have any zero values. I should have
> >>>>>>> mentioned this earlier. My bad!
> >>>>>>>
> >>>>>>> The stream topology code looks something like this.
> >>>>>>>
> >>>>>>> stream
> >>>>>>>     .filter()
> >>>>>>>     .map((key, value) -> new KeyValue<>(transform(key), value)
> >>>>>>>     .groupByKey()
> >>>>>>>     .aggregate(HashSet::new, AggregatorFunction(),
> >>>>>>> TimeWindows.of(60000).until(3600000))
> >>>>>>>     .mapValues(HashSet::size)
> >>>>>>>     .toStream()
> >>>>>>>     .map((key, value) -> convertToProtobufObject(key, value))
> >>>>>>>     .to()
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Fri, Apr 28, 2017 at 1:13 PM, Matthias J. Sax <
> >>>>> matthias@confluent.io>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Thanks for the details (sorry that I forgot that you did share the
> >>>>>>>> output already).
> >>>>>>>>
> >>>>>>>> Might be a dumb question, but what is the count for missing
> windows
> >>>>> in
> >>>>>>>> your seconds implementation?
> >>>>>>>>
> >>>>>>>> If there is no data for a window, it should not emit a window with
> >>>>> count
> >>>>>>>> zero, but nothing.
> >>>>>>>>
> >>>>>>>> Thus, looking at your output, I am wondering how it could contain
> >>>>> line
> >>>>>>>> like:
> >>>>>>>>
> >>>>>>>>> 2017-04-27T04:53:00 0
> >>>>>>>>
> >>>>>>>> I am also wondering why your output only contains a single value
> >> per
> >>>>>>>> window. As Streams outputs multiple updates per window while the
> >>>>> count
> >>>>>>>> is increasing, you should actually see multiple records per
> window.
> >>>>>>>>
> >>>>>>>> Your code is like this:
> >>>>>>>>
> >>>>>>>> stream.filter().groupByKey().count(TimeWindow.of(60000)).to();
> >>>>>>>>
> >>>>>>>> Or do you have something more complex?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 4/27/17 9:16 PM, Mahendra Kariya wrote:
> >>>>>>>>>> Can you somehow verify your output?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Do you mean the Kafka streams output? In the Kafka Streams
> output,
> >>>>> we do
> >>>>>>>>> see some missing values. I have attached the Kafka Streams output
> >>>>> (for a
> >>>>>>>>> few hours) in the very first email of this thread for reference.
> >>>>>>>>>
> >>>>>>>>> Let me also summarise what we have done so far.
> >>>>>>>>>
> >>>>>>>>> We took a dump of the raw data present in the source topic. We
> >>>>> wrote a
> >>>>>>>>> script to read this data and do the exact same aggregations that
> >> we
> >>>>> do
> >>>>>>>>> using Kafka Streams. And then we compared the output from Kafka
> >>>>> Streams
> >>>>>>>> and
> >>>>>>>>> our script.
> >>>>>>>>>
> >>>>>>>>> The difference that we observed in the two outputs is that there
> >>>>> were a
> >>>>>>>> few
> >>>>>>>>> rows (corresponding to some time windows) missing in the Streams
> >>>>> output.
> >>>>>>>>> For the time windows for which the data was present, the
> >> aggregated
> >>>>>>>> numbers
> >>>>>>>>> matched exactly.
> >>>>>>>>>
> >>>>>>>>> This means, either all the records for a particular time window
> >> are
> >>>>> being
> >>>>>>>>> skipped, or none. Now this is highly unlikely to happen. Maybe
> >>>>> there is a
> >>>>>>>>> bug somewhere in the rocksdb state stores? Just a speculation,
> not
> >>>>> sure
> >>>>>>>>> though. And there could even be a bug in the reported metric.
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>
>

Re: Debugging Kafka Streams Windowing

Posted by Mahendra Kariya <ma...@go-jek.com>.
Hi Matthias,

Please find the answers below.

I would recommend to double check the following:
>
>  - can you confirm that the filter does not remove all data for those
> time periods?
>

Filter does not remove all data. There is a lot of data coming in even
after the filter stage.


>  - I would also check input for your AggregatorFunction() -- does it
> receive everything?
>

Yes. Aggregate function seems to be receiving everything.


>  - same for .mapValues()
>

I am not sure how to check this.

Re: Debugging Kafka Streams Windowing

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I would recommend to double check the following:

 - can you confirm that the filter does not remove all data for those
time periods?
 - I would also check input for your AggregatorFunction() -- does it
receive everything?
 - same for .mapValues()

This would help to understand in what part of the program the data gets
lost.


-Matthias


On 5/2/17 11:09 PM, Mahendra Kariya wrote:
> Hi Garrett,
> 
> Thanks for these insights. But we are not consuming old data. We want the
> Streams app to run in near real time. And that is how it is actually
> running. The lag never increases beyond a certain limit. So I don't think
> that's an issue.
> 
> The values of the configs that you are mentioning are whatever Kafka offers
> by default. So I guess that should be fine.
> 
> 
> 
> On Tue, May 2, 2017 at 7:52 PM, Garrett Barton <ga...@gmail.com>
> wrote:
> 
>> Mahendra,
>>
>>  One possible thing I have seen that exhibits the same behavior of missing
>> windows of data is the configuration of the topics (internal and your own)
>> retention policies.  I was loading data that was fairly old (weeks) and
>> using event time semantics as the record timestamp (custom timestamp
>> extractor) and the cleanup stuff was deleting segments nearly right after
>> they were written.  In my case default cleanup run was every 5 minutes, and
>> the default retention was 7 days, so every 5 minutes I lost data.  In my
>> logs I saw a ton of warnings about 'offset not found' and kafka skipping
>> ahead to whatever the next available offset was.  End result was gaps all
>> over my data.  I don't have a good fix yet, I set the retention to
>> something massive which I think is getting me other problems.
>>
>> Maybe that helps?
>>
>> On Tue, May 2, 2017 at 6:27 AM, Mahendra Kariya <
>> mahendra.kariya@go-jek.com>
>> wrote:
>>
>>> Hi Matthias,
>>>
>>> What we did was read the data from sink topic and print it to console.
>> And
>>> here's the raw data from that topic (the counts are randomized). As we
>> can
>>> see, the data is certainly missing for some time windows. For instance,
>>> after 1493693760, the next timestamp for which the data is present
>>> is 1493694300. That's around 9 minutes of data missing.
>>>
>>> And this is just one instance. There are a lot of such instances in this
>>> file.
>>>
>>>
>>>
>>> On Sun, Apr 30, 2017 at 11:23 AM, Mahendra Kariya <
>>> mahendra.kariya@go-jek.com> wrote:
>>>
>>>> Thanks for the update Matthias! And sorry for the delayed response.
>>>>
>>>> The reason we use .aggregate() is because we want to count the number of
>>>> unique values for a particular field in the message. So, we just add
>> that
>>>> particular field's value in the HashSet and then take the size of the
>>>> HashSet.
>>>>
>>>> On our side, we are also investigating and it looks like there might be
>> a
>>>> bug somewhere in our codebase. If that's the case, then it's quite
>> possible
>>>> that there is no bug in Kafka Streams, except the metric one.
>>>>
>>>> We will revert after confirming.
>>>>
>>>>
>>>>
>>>>
>>>> On Sun, Apr 30, 2017 at 10:39 AM, Matthias J. Sax <
>> matthias@confluent.io>
>>>> wrote:
>>>>
>>>>> Just a follow up (we identified a bug in the "skipped records" metric).
>>>>> The reported value is not correct.
>>>>>
>>>>>
>>>>> On 4/28/17 9:12 PM, Matthias J. Sax wrote:
>>>>>> Ok. That makes sense.
>>>>>>
>>>>>> Question: why do you use .aggregate() instead of .count() ?
>>>>>>
>>>>>> Also, can you share the code of you AggregatorFunction()? Did you
>>>>> change
>>>>>> any default setting of StreamsConfig?
>>>>>>
>>>>>> I have still no idea what could go wrong. Maybe you can run with log
>>>>>> level TRACE? Maybe we can get some insight from those.
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 4/27/17 11:41 PM, Mahendra Kariya wrote:
>>>>>>> Oh good point!
>>>>>>>
>>>>>>> The reason why there is only one row corresponding to each time
>>>>> window is
>>>>>>> because it only contains the latest value for the time window. So
>>>>> what we
>>>>>>> did was we just dumped the data present in the sink topic to a db
>>>>> using an
>>>>>>> upsert query. The primary key of the table was time window. The file
>>>>> that I
>>>>>>> attached is actually the data present in the DB. And we know that
>>>>> there is
>>>>>>> no bug in our db dump code because we have been using it for a long
>>>>> time in
>>>>>>> production without any issues.
>>>>>>>
>>>>>>> The reason the count is zero for some time windows is because I
>>>>> subtracted
>>>>>>> a random number the actual values and rounded it off to zero; for
>>>>> privacy
>>>>>>> reason. The actual data doesn't have any zero values. I should have
>>>>>>> mentioned this earlier. My bad!
>>>>>>>
>>>>>>> The stream topology code looks something like this.
>>>>>>>
>>>>>>> stream
>>>>>>>     .filter()
>>>>>>>     .map((key, value) -> new KeyValue<>(transform(key), value)
>>>>>>>     .groupByKey()
>>>>>>>     .aggregate(HashSet::new, AggregatorFunction(),
>>>>>>> TimeWindows.of(60000).until(3600000))
>>>>>>>     .mapValues(HashSet::size)
>>>>>>>     .toStream()
>>>>>>>     .map((key, value) -> convertToProtobufObject(key, value))
>>>>>>>     .to()
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Apr 28, 2017 at 1:13 PM, Matthias J. Sax <
>>>>> matthias@confluent.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks for the details (sorry that I forgot that you did share the
>>>>>>>> output already).
>>>>>>>>
>>>>>>>> Might be a dumb question, but what is the count for missing windows
>>>>> in
>>>>>>>> your seconds implementation?
>>>>>>>>
>>>>>>>> If there is no data for a window, it should not emit a window with
>>>>> count
>>>>>>>> zero, but nothing.
>>>>>>>>
>>>>>>>> Thus, looking at your output, I am wondering how it could contain
>>>>> line
>>>>>>>> like:
>>>>>>>>
>>>>>>>>> 2017-04-27T04:53:00 0
>>>>>>>>
>>>>>>>> I am also wondering why your output only contains a single value
>> per
>>>>>>>> window. As Streams outputs multiple updates per window while the
>>>>> count
>>>>>>>> is increasing, you should actually see multiple records per window.
>>>>>>>>
>>>>>>>> Your code is like this:
>>>>>>>>
>>>>>>>> stream.filter().groupByKey().count(TimeWindow.of(60000)).to();
>>>>>>>>
>>>>>>>> Or do you have something more complex?
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>> On 4/27/17 9:16 PM, Mahendra Kariya wrote:
>>>>>>>>>> Can you somehow verify your output?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Do you mean the Kafka streams output? In the Kafka Streams output,
>>>>> we do
>>>>>>>>> see some missing values. I have attached the Kafka Streams output
>>>>> (for a
>>>>>>>>> few hours) in the very first email of this thread for reference.
>>>>>>>>>
>>>>>>>>> Let me also summarise what we have done so far.
>>>>>>>>>
>>>>>>>>> We took a dump of the raw data present in the source topic. We
>>>>> wrote a
>>>>>>>>> script to read this data and do the exact same aggregations that
>> we
>>>>> do
>>>>>>>>> using Kafka Streams. And then we compared the output from Kafka
>>>>> Streams
>>>>>>>> and
>>>>>>>>> our script.
>>>>>>>>>
>>>>>>>>> The difference that we observed in the two outputs is that there
>>>>> were a
>>>>>>>> few
>>>>>>>>> rows (corresponding to some time windows) missing in the Streams
>>>>> output.
>>>>>>>>> For the time windows for which the data was present, the
>> aggregated
>>>>>>>> numbers
>>>>>>>>> matched exactly.
>>>>>>>>>
>>>>>>>>> This means, either all the records for a particular time window
>> are
>>>>> being
>>>>>>>>> skipped, or none. Now this is highly unlikely to happen. Maybe
>>>>> there is a
>>>>>>>>> bug somewhere in the rocksdb state stores? Just a speculation, not
>>>>> sure
>>>>>>>>> though. And there could even be a bug in the reported metric.
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
> 


Re: Debugging Kafka Streams Windowing

Posted by Mahendra Kariya <ma...@go-jek.com>.
Hi Garrett,

Thanks for these insights. But we are not consuming old data. We want the
Streams app to run in near real time. And that is how it is actually
running. The lag never increases beyond a certain limit. So I don't think
that's an issue.

The values of the configs that you are mentioning are whatever Kafka offers
by default. So I guess that should be fine.



On Tue, May 2, 2017 at 7:52 PM, Garrett Barton <ga...@gmail.com>
wrote:

> Mahendra,
>
>  One possible thing I have seen that exhibits the same behavior of missing
> windows of data is the configuration of the topics (internal and your own)
> retention policies.  I was loading data that was fairly old (weeks) and
> using event time semantics as the record timestamp (custom timestamp
> extractor) and the cleanup stuff was deleting segments nearly right after
> they were written.  In my case default cleanup run was every 5 minutes, and
> the default retention was 7 days, so every 5 minutes I lost data.  In my
> logs I saw a ton of warnings about 'offset not found' and kafka skipping
> ahead to whatever the next available offset was.  End result was gaps all
> over my data.  I don't have a good fix yet, I set the retention to
> something massive which I think is getting me other problems.
>
> Maybe that helps?
>
> On Tue, May 2, 2017 at 6:27 AM, Mahendra Kariya <
> mahendra.kariya@go-jek.com>
> wrote:
>
> > Hi Matthias,
> >
> > What we did was read the data from sink topic and print it to console.
> And
> > here's the raw data from that topic (the counts are randomized). As we
> can
> > see, the data is certainly missing for some time windows. For instance,
> > after 1493693760, the next timestamp for which the data is present
> > is 1493694300. That's around 9 minutes of data missing.
> >
> > And this is just one instance. There are a lot of such instances in this
> > file.
> >
> >
> >
> > On Sun, Apr 30, 2017 at 11:23 AM, Mahendra Kariya <
> > mahendra.kariya@go-jek.com> wrote:
> >
> >> Thanks for the update Matthias! And sorry for the delayed response.
> >>
> >> The reason we use .aggregate() is because we want to count the number of
> >> unique values for a particular field in the message. So, we just add
> that
> >> particular field's value in the HashSet and then take the size of the
> >> HashSet.
> >>
> >> On our side, we are also investigating and it looks like there might be
> a
> >> bug somewhere in our codebase. If that's the case, then it's quite
> possible
> >> that there is no bug in Kafka Streams, except the metric one.
> >>
> >> We will revert after confirming.
> >>
> >>
> >>
> >>
> >> On Sun, Apr 30, 2017 at 10:39 AM, Matthias J. Sax <
> matthias@confluent.io>
> >> wrote:
> >>
> >>> Just a follow up (we identified a bug in the "skipped records" metric).
> >>> The reported value is not correct.
> >>>
> >>>
> >>> On 4/28/17 9:12 PM, Matthias J. Sax wrote:
> >>> > Ok. That makes sense.
> >>> >
> >>> > Question: why do you use .aggregate() instead of .count() ?
> >>> >
> >>> > Also, can you share the code of you AggregatorFunction()? Did you
> >>> change
> >>> > any default setting of StreamsConfig?
> >>> >
> >>> > I have still no idea what could go wrong. Maybe you can run with log
> >>> > level TRACE? Maybe we can get some insight from those.
> >>> >
> >>> >
> >>> > -Matthias
> >>> >
> >>> > On 4/27/17 11:41 PM, Mahendra Kariya wrote:
> >>> >> Oh good point!
> >>> >>
> >>> >> The reason why there is only one row corresponding to each time
> >>> window is
> >>> >> because it only contains the latest value for the time window. So
> >>> what we
> >>> >> did was we just dumped the data present in the sink topic to a db
> >>> using an
> >>> >> upsert query. The primary key of the table was time window. The file
> >>> that I
> >>> >> attached is actually the data present in the DB. And we know that
> >>> there is
> >>> >> no bug in our db dump code because we have been using it for a long
> >>> time in
> >>> >> production without any issues.
> >>> >>
> >>> >> The reason the count is zero for some time windows is because I
> >>> subtracted
> >>> >> a random number the actual values and rounded it off to zero; for
> >>> privacy
> >>> >> reason. The actual data doesn't have any zero values. I should have
> >>> >> mentioned this earlier. My bad!
> >>> >>
> >>> >> The stream topology code looks something like this.
> >>> >>
> >>> >> stream
> >>> >>     .filter()
> >>> >>     .map((key, value) -> new KeyValue<>(transform(key), value)
> >>> >>     .groupByKey()
> >>> >>     .aggregate(HashSet::new, AggregatorFunction(),
> >>> >> TimeWindows.of(60000).until(3600000))
> >>> >>     .mapValues(HashSet::size)
> >>> >>     .toStream()
> >>> >>     .map((key, value) -> convertToProtobufObject(key, value))
> >>> >>     .to()
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >> On Fri, Apr 28, 2017 at 1:13 PM, Matthias J. Sax <
> >>> matthias@confluent.io>
> >>> >> wrote:
> >>> >>
> >>> >>> Thanks for the details (sorry that I forgot that you did share the
> >>> >>> output already).
> >>> >>>
> >>> >>> Might be a dumb question, but what is the count for missing windows
> >>> in
> >>> >>> your seconds implementation?
> >>> >>>
> >>> >>> If there is no data for a window, it should not emit a window with
> >>> count
> >>> >>> zero, but nothing.
> >>> >>>
> >>> >>> Thus, looking at your output, I am wondering how it could contain
> >>> line
> >>> >>> like:
> >>> >>>
> >>> >>>> 2017-04-27T04:53:00 0
> >>> >>>
> >>> >>> I am also wondering why your output only contains a single value
> per
> >>> >>> window. As Streams outputs multiple updates per window while the
> >>> count
> >>> >>> is increasing, you should actually see multiple records per window.
> >>> >>>
> >>> >>> Your code is like this:
> >>> >>>
> >>> >>> stream.filter().groupByKey().count(TimeWindow.of(60000)).to();
> >>> >>>
> >>> >>> Or do you have something more complex?
> >>> >>>
> >>> >>>
> >>> >>> -Matthias
> >>> >>>
> >>> >>>
> >>> >>> On 4/27/17 9:16 PM, Mahendra Kariya wrote:
> >>> >>>>> Can you somehow verify your output?
> >>> >>>>
> >>> >>>>
> >>> >>>> Do you mean the Kafka streams output? In the Kafka Streams output,
> >>> we do
> >>> >>>> see some missing values. I have attached the Kafka Streams output
> >>> (for a
> >>> >>>> few hours) in the very first email of this thread for reference.
> >>> >>>>
> >>> >>>> Let me also summarise what we have done so far.
> >>> >>>>
> >>> >>>> We took a dump of the raw data present in the source topic. We
> >>> wrote a
> >>> >>>> script to read this data and do the exact same aggregations that
> we
> >>> do
> >>> >>>> using Kafka Streams. And then we compared the output from Kafka
> >>> Streams
> >>> >>> and
> >>> >>>> our script.
> >>> >>>>
> >>> >>>> The difference that we observed in the two outputs is that there
> >>> were a
> >>> >>> few
> >>> >>>> rows (corresponding to some time windows) missing in the Streams
> >>> output.
> >>> >>>> For the time windows for which the data was present, the
> aggregated
> >>> >>> numbers
> >>> >>>> matched exactly.
> >>> >>>>
> >>> >>>> This means, either all the records for a particular time window
> are
> >>> being
> >>> >>>> skipped, or none. Now this is highly unlikely to happen. Maybe
> >>> there is a
> >>> >>>> bug somewhere in the rocksdb state stores? Just a speculation, not
> >>> sure
> >>> >>>> though. And there could even be a bug in the reported metric.
> >>> >>>>
> >>> >>>
> >>> >>>
> >>> >>
> >>> >
> >>>
> >>>
> >>
> >
>

Re: Debugging Kafka Streams Windowing

Posted by Garrett Barton <ga...@gmail.com>.
Mahendra,

 One possible thing I have seen that exhibits the same behavior of missing
windows of data is the configuration of the topics (internal and your own)
retention policies.  I was loading data that was fairly old (weeks) and
using event time semantics as the record timestamp (custom timestamp
extractor) and the cleanup stuff was deleting segments nearly right after
they were written.  In my case default cleanup run was every 5 minutes, and
the default retention was 7 days, so every 5 minutes I lost data.  In my
logs I saw a ton of warnings about 'offset not found' and kafka skipping
ahead to whatever the next available offset was.  End result was gaps all
over my data.  I don't have a good fix yet, I set the retention to
something massive which I think is getting me other problems.

Maybe that helps?

On Tue, May 2, 2017 at 6:27 AM, Mahendra Kariya <ma...@go-jek.com>
wrote:

> Hi Matthias,
>
> What we did was read the data from sink topic and print it to console. And
> here's the raw data from that topic (the counts are randomized). As we can
> see, the data is certainly missing for some time windows. For instance,
> after 1493693760, the next timestamp for which the data is present
> is 1493694300. That's around 9 minutes of data missing.
>
> And this is just one instance. There are a lot of such instances in this
> file.
>
>
>
> On Sun, Apr 30, 2017 at 11:23 AM, Mahendra Kariya <
> mahendra.kariya@go-jek.com> wrote:
>
>> Thanks for the update Matthias! And sorry for the delayed response.
>>
>> The reason we use .aggregate() is because we want to count the number of
>> unique values for a particular field in the message. So, we just add that
>> particular field's value in the HashSet and then take the size of the
>> HashSet.
>>
>> On our side, we are also investigating and it looks like there might be a
>> bug somewhere in our codebase. If that's the case, then it's quite possible
>> that there is no bug in Kafka Streams, except the metric one.
>>
>> We will revert after confirming.
>>
>>
>>
>>
>> On Sun, Apr 30, 2017 at 10:39 AM, Matthias J. Sax <ma...@confluent.io>
>> wrote:
>>
>>> Just a follow up (we identified a bug in the "skipped records" metric).
>>> The reported value is not correct.
>>>
>>>
>>> On 4/28/17 9:12 PM, Matthias J. Sax wrote:
>>> > Ok. That makes sense.
>>> >
>>> > Question: why do you use .aggregate() instead of .count() ?
>>> >
>>> > Also, can you share the code of you AggregatorFunction()? Did you
>>> change
>>> > any default setting of StreamsConfig?
>>> >
>>> > I have still no idea what could go wrong. Maybe you can run with log
>>> > level TRACE? Maybe we can get some insight from those.
>>> >
>>> >
>>> > -Matthias
>>> >
>>> > On 4/27/17 11:41 PM, Mahendra Kariya wrote:
>>> >> Oh good point!
>>> >>
>>> >> The reason why there is only one row corresponding to each time
>>> window is
>>> >> because it only contains the latest value for the time window. So
>>> what we
>>> >> did was we just dumped the data present in the sink topic to a db
>>> using an
>>> >> upsert query. The primary key of the table was time window. The file
>>> that I
>>> >> attached is actually the data present in the DB. And we know that
>>> there is
>>> >> no bug in our db dump code because we have been using it for a long
>>> time in
>>> >> production without any issues.
>>> >>
>>> >> The reason the count is zero for some time windows is because I
>>> subtracted
>>> >> a random number the actual values and rounded it off to zero; for
>>> privacy
>>> >> reason. The actual data doesn't have any zero values. I should have
>>> >> mentioned this earlier. My bad!
>>> >>
>>> >> The stream topology code looks something like this.
>>> >>
>>> >> stream
>>> >>     .filter()
>>> >>     .map((key, value) -> new KeyValue<>(transform(key), value)
>>> >>     .groupByKey()
>>> >>     .aggregate(HashSet::new, AggregatorFunction(),
>>> >> TimeWindows.of(60000).until(3600000))
>>> >>     .mapValues(HashSet::size)
>>> >>     .toStream()
>>> >>     .map((key, value) -> convertToProtobufObject(key, value))
>>> >>     .to()
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> On Fri, Apr 28, 2017 at 1:13 PM, Matthias J. Sax <
>>> matthias@confluent.io>
>>> >> wrote:
>>> >>
>>> >>> Thanks for the details (sorry that I forgot that you did share the
>>> >>> output already).
>>> >>>
>>> >>> Might be a dumb question, but what is the count for missing windows
>>> in
>>> >>> your seconds implementation?
>>> >>>
>>> >>> If there is no data for a window, it should not emit a window with
>>> count
>>> >>> zero, but nothing.
>>> >>>
>>> >>> Thus, looking at your output, I am wondering how it could contain
>>> line
>>> >>> like:
>>> >>>
>>> >>>> 2017-04-27T04:53:00 0
>>> >>>
>>> >>> I am also wondering why your output only contains a single value per
>>> >>> window. As Streams outputs multiple updates per window while the
>>> count
>>> >>> is increasing, you should actually see multiple records per window.
>>> >>>
>>> >>> Your code is like this:
>>> >>>
>>> >>> stream.filter().groupByKey().count(TimeWindow.of(60000)).to();
>>> >>>
>>> >>> Or do you have something more complex?
>>> >>>
>>> >>>
>>> >>> -Matthias
>>> >>>
>>> >>>
>>> >>> On 4/27/17 9:16 PM, Mahendra Kariya wrote:
>>> >>>>> Can you somehow verify your output?
>>> >>>>
>>> >>>>
>>> >>>> Do you mean the Kafka streams output? In the Kafka Streams output,
>>> we do
>>> >>>> see some missing values. I have attached the Kafka Streams output
>>> (for a
>>> >>>> few hours) in the very first email of this thread for reference.
>>> >>>>
>>> >>>> Let me also summarise what we have done so far.
>>> >>>>
>>> >>>> We took a dump of the raw data present in the source topic. We
>>> wrote a
>>> >>>> script to read this data and do the exact same aggregations that we
>>> do
>>> >>>> using Kafka Streams. And then we compared the output from Kafka
>>> Streams
>>> >>> and
>>> >>>> our script.
>>> >>>>
>>> >>>> The difference that we observed in the two outputs is that there
>>> were a
>>> >>> few
>>> >>>> rows (corresponding to some time windows) missing in the Streams
>>> output.
>>> >>>> For the time windows for which the data was present, the aggregated
>>> >>> numbers
>>> >>>> matched exactly.
>>> >>>>
>>> >>>> This means, either all the records for a particular time window are
>>> being
>>> >>>> skipped, or none. Now this is highly unlikely to happen. Maybe
>>> there is a
>>> >>>> bug somewhere in the rocksdb state stores? Just a speculation, not
>>> sure
>>> >>>> though. And there could even be a bug in the reported metric.
>>> >>>>
>>> >>>
>>> >>>
>>> >>
>>> >
>>>
>>>
>>
>