You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ali Akhtar <al...@gmail.com> on 2016/10/07 01:25:56 UTC

Printing to stdin from KStreams?

What the subject says. For dev, it would be a lot easier if debugging info
can be printed to stdin instead of another topic, where it will persist.

Any ideas if this is possible?

Re: Printing to stdin from KStreams?

Posted by Michael Noll <mi...@confluent.io>.
Nice idea. :-)

Happy to hear it works for you, and thanks for sharing your workaround with
the mailing list.

On Fri, Oct 7, 2016 at 5:33 PM, Ali Akhtar <al...@gmail.com> wrote:

> Thank you.
>
> I've resolved this by adding a run config in Intellij for running
> streams-reset, and using the same application id in all applications in
> development (transparently reading the application id from environment
> variables, so in my kubernetes config I can specify different app ids for
> production)
>
> On Fri, Oct 7, 2016 at 8:05 PM, Michael Noll <mi...@confluent.io> wrote:
>
> > > Is it possible to have kafka-streams-reset be automatically called
> during
> > > development? Something like streams.cleanUp() but which also does
> reset?
> >
> > Unfortunately this isn't possible (yet), Ali.  I am also not aware of any
> > plan to add such a feature in the short-term.
> >
> >
> >
> > On Fri, Oct 7, 2016 at 1:36 PM, Ali Akhtar <al...@gmail.com> wrote:
> >
> > > Is it possible to have kafka-streams-reset be automatically called
> during
> > > development? Something like streams.cleanUp() but which also does
> reset?
> > >
> > > On Fri, Oct 7, 2016 at 2:45 PM, Michael Noll <mi...@confluent.io>
> > wrote:
> > >
> > > > Ali,
> > > >
> > > > adding to what Matthias said:
> > > >
> > > > Kafka 0.10 changed the message format to add so-called "embedded
> > > > timestamps" into each Kafka message.  The Java producer included in
> > Kafka
> > > > 0.10 includes such embedded timestamps into any generated message as
> > > > expected.
> > > >
> > > > However, other clients (like the go kafka plugin you are using) may
> not
> > > > have been updated yet to be compatible with the new 0.10 message
> > format.
> > > > That's the root cause why see these "-1" negative timestamps.   (The
> > same
> > > > negative timestamp problem also happens if you attempt to read
> messages
> > > > that were generated with pre-0.10 versions of Kafka's Java producer.)
> > > >
> > > > FYI: Kafka Streams' default timestamp extractor attempts to read
> those
> > > new
> > > > embedded timestamps.  If there are no such embedded timestamps, you
> run
> > > > into these "negative timestamps" errors.
> > > >
> > > > Now, how to fix your problem?
> > > >
> > > > - Fix the root cause: Check if there's a newer version of your Go
> kafka
> > > > plugin that generates messages in the new Kafka 0.10 format.  If
> there
> > is
> > > > no such version, ask the maintainers for an update. :-)
> > > >
> > > > - Work around the problem:  As Matthias said, you can also tell Kafka
> > > > Streams to not use its default timestamp extractor.  You can fallback
> > to
> > > > the WallclockTimestampExtractor, though this means your application
> > will
> > > > not use event-time but processing-time when processing your data,
> which
> > > is
> > > > probably not what you want (but it does prevent the -1 timestamp
> > errors).
> > > > If your data (generated by the go kafka plugin) *does* contain
> > timestamp
> > > > information in the message payload, then the better option is to
> write
> > a
> > > > custom timestamp extract that inspects each message, extracts the
> > > timestamp
> > > > from the payload, and returns it to Kafka Streams.  The Timestamp
> > > Extractor
> > > > section in [1] explains how to write a custom one and how to
> configure
> > > your
> > > > app to use it.
> > > >
> > > > Hope this helps,
> > > > Michael
> > > >
> > > >
> > > >
> > > > [1]
> > > > http://docs.confluent.io/3.0.1/streams/developer-guide.
> > > > html#optional-configuration-parameters
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Fri, Oct 7, 2016 at 5:17 AM, Matthias J. Sax <
> matthias@confluent.io
> > >
> > > > wrote:
> > > >
> > > > > -----BEGIN PGP SIGNED MESSAGE-----
> > > > > Hash: SHA512
> > > > >
> > > > > If you restart your application, it will resume where is left off
> > > > > (same as any other Kafka consumer that does use group management
> and
> > > > > commits offsets).
> > > > >
> > > > > If you want to reprocess data from scratch, you need to reset your
> > > > > application using bin/kafka-streams-application-reset.sh
> > > > >
> > > > > See also
> > > > > http://docs.confluent.io/3.0.1/streams/developer-guide.
> > > html#application-
> > > > > reset-tool
> > > > >
> > > > > and
> > > > > http://www.confluent.io/blog/data-reprocessing-with-kafka-
> > > streams-resett
> > > > > ing-a-streams-application/
> > > > >
> > > > >
> > > > > About the timestamp issue: it seems that your Go client does not
> > > > > assign valid timestamps when writing the data. As you already said,
> > > > > you need to provide a custom TimestampExtractor (or you
> > > > > WallclockTimestampExtractor if semantic permit) instead of default
> > > > > ConsumerRecordTimestampExtractor)
> > > > >
> > > > >
> > > > > - -Matthias
> > > > >
> > > > > On 10/6/16 7:53 PM, Ali Akhtar wrote:
> > > > > > Thanks.
> > > > > >
> > > > > > I'm encountering a strange issue.
> > > > > >
> > > > > > If I create messages thru console-producer.sh on a new topic,
> > > > > > things work fine.
> > > > > >
> > > > > > But on the topic that I need to consume, the messages are being
> > > > > > produced via the go kafka plugin.
> > > > > >
> > > > > > On this topic, at first, nothing happens when the stream starts
> > > > > > (i.e it doesn't process the messages which are already in there)
> > > > > >
> > > > > > Then, if I produce new messages, then my exception handler is
> > > > > > called with the exception that timestamp is negative.
> > > > > >
> > > > > > I'm pretty sure I'm on kafka 0.10. I downloaded it 2-3 weeks ago.
> > > > > >
> > > > > > I'm going to write a new timestamp extractor, but any ideas why
> > > > > > nothing happens with the old messages which are in the topic, it
> > > > > > only responds if i push new messages to this topic?
> > > > > >
> > > > > > On Fri, Oct 7, 2016 at 7:25 AM, Matthias J. Sax
> > > > > > <ma...@confluent.io> wrote:
> > > > > >
> > > > > > Sure.
> > > > > >
> > > > > > Just use #print() or #writeAsText()
> > > > > >
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > > On 10/6/16 6:25 PM, Ali Akhtar wrote:
> > > > > >>>> What the subject says. For dev, it would be a lot easier if
> > > > > >>>> debugging info can be printed to stdin instead of another
> > > > > >>>> topic, where it will persist.
> > > > > >>>>
> > > > > >>>> Any ideas if this is possible?
> > > > > >>>>
> > > > > >>
> > > > > >
> > > > > -----BEGIN PGP SIGNATURE-----
> > > > > Comment: GPGTools - https://gpgtools.org
> > > > >
> > > > > iQIcBAEBCgAGBQJX9xO7AAoJECnhiMLycopPLFwQAK76xmPobB5xFdE/qFWm94d5
> > > > > 8lj8LahMMBUCG4xWCEs4shvGHjkh2kx2cUZmdgkUSLtEy7HGK6MgmjTa8Smse+0f
> > > > > JxQ0f/F8lkMvJKhuw9wmgOKnT/b/U/jRXvUedWvYXp/r7d8Z36DgW9hzO9Yx7Ugq
> > > > > qafvCfMdScE4FZEOoU/wJLiRJ3FZZsEobQSbyXR9Vmjs9UYUExUpq02B2N0ItvWz
> > > > > 6JPtWNC2PWSlc7j7C7PK0XYeexuE/ZK9yLrM7iuXh6fYeTy3MtBV3pHsDn3d+I2m
> > > > > AOUUMyJt4kaSyMX0BzWZVVKZVvdw7rbbGfsZisw67Mko2p+De2KoIEF3yEEvYEit
> > > > > Vks00KzGZ1gvGdMDvKzJJRkMVLUxl2R4LxH/TEwXKn5WYQullEHtQ3xb0U7sl2Ae
> > > > > UkIgOw0LY3sQj7NL2OOnt9dMS5m0r+FZPlMoYNmN7coAGxo98iKacIqR1tc3f2qs
> > > > > NxW2iUvD9lzVaZhaMOY3AjD1Q2G7yyQ+wRdlcZtNkAAID4mNrbyu7MKT7x6paLUf
> > > > > OXGjtl8tcMwegSqZtpEZnJFSquu0SuX2UraDWDiz6NaW+TYIM8Qnq3oF9gWDQX+I
> > > > > gvtqMiGzxxs4ZW9ysueT+X1MRoPRrnwlXPQ7XVFR6oKMEJrw0W2x8TkyHJiXVdls
> > > > > ZBA0KEjx9U8NNf+eiqN5
> > > > > =UMbs
> > > > > -----END PGP SIGNATURE-----
> > > > >
> > > >
> > >
> >
>

Re: Printing to stdin from KStreams?

Posted by Ali Akhtar <al...@gmail.com>.
Thank you.

I've resolved this by adding a run config in Intellij for running
streams-reset, and using the same application id in all applications in
development (transparently reading the application id from environment
variables, so in my kubernetes config I can specify different app ids for
production)

On Fri, Oct 7, 2016 at 8:05 PM, Michael Noll <mi...@confluent.io> wrote:

> > Is it possible to have kafka-streams-reset be automatically called during
> > development? Something like streams.cleanUp() but which also does reset?
>
> Unfortunately this isn't possible (yet), Ali.  I am also not aware of any
> plan to add such a feature in the short-term.
>
>
>
> On Fri, Oct 7, 2016 at 1:36 PM, Ali Akhtar <al...@gmail.com> wrote:
>
> > Is it possible to have kafka-streams-reset be automatically called during
> > development? Something like streams.cleanUp() but which also does reset?
> >
> > On Fri, Oct 7, 2016 at 2:45 PM, Michael Noll <mi...@confluent.io>
> wrote:
> >
> > > Ali,
> > >
> > > adding to what Matthias said:
> > >
> > > Kafka 0.10 changed the message format to add so-called "embedded
> > > timestamps" into each Kafka message.  The Java producer included in
> Kafka
> > > 0.10 includes such embedded timestamps into any generated message as
> > > expected.
> > >
> > > However, other clients (like the go kafka plugin you are using) may not
> > > have been updated yet to be compatible with the new 0.10 message
> format.
> > > That's the root cause why see these "-1" negative timestamps.   (The
> same
> > > negative timestamp problem also happens if you attempt to read messages
> > > that were generated with pre-0.10 versions of Kafka's Java producer.)
> > >
> > > FYI: Kafka Streams' default timestamp extractor attempts to read those
> > new
> > > embedded timestamps.  If there are no such embedded timestamps, you run
> > > into these "negative timestamps" errors.
> > >
> > > Now, how to fix your problem?
> > >
> > > - Fix the root cause: Check if there's a newer version of your Go kafka
> > > plugin that generates messages in the new Kafka 0.10 format.  If there
> is
> > > no such version, ask the maintainers for an update. :-)
> > >
> > > - Work around the problem:  As Matthias said, you can also tell Kafka
> > > Streams to not use its default timestamp extractor.  You can fallback
> to
> > > the WallclockTimestampExtractor, though this means your application
> will
> > > not use event-time but processing-time when processing your data, which
> > is
> > > probably not what you want (but it does prevent the -1 timestamp
> errors).
> > > If your data (generated by the go kafka plugin) *does* contain
> timestamp
> > > information in the message payload, then the better option is to write
> a
> > > custom timestamp extract that inspects each message, extracts the
> > timestamp
> > > from the payload, and returns it to Kafka Streams.  The Timestamp
> > Extractor
> > > section in [1] explains how to write a custom one and how to configure
> > your
> > > app to use it.
> > >
> > > Hope this helps,
> > > Michael
> > >
> > >
> > >
> > > [1]
> > > http://docs.confluent.io/3.0.1/streams/developer-guide.
> > > html#optional-configuration-parameters
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Fri, Oct 7, 2016 at 5:17 AM, Matthias J. Sax <matthias@confluent.io
> >
> > > wrote:
> > >
> > > > -----BEGIN PGP SIGNED MESSAGE-----
> > > > Hash: SHA512
> > > >
> > > > If you restart your application, it will resume where is left off
> > > > (same as any other Kafka consumer that does use group management and
> > > > commits offsets).
> > > >
> > > > If you want to reprocess data from scratch, you need to reset your
> > > > application using bin/kafka-streams-application-reset.sh
> > > >
> > > > See also
> > > > http://docs.confluent.io/3.0.1/streams/developer-guide.
> > html#application-
> > > > reset-tool
> > > >
> > > > and
> > > > http://www.confluent.io/blog/data-reprocessing-with-kafka-
> > streams-resett
> > > > ing-a-streams-application/
> > > >
> > > >
> > > > About the timestamp issue: it seems that your Go client does not
> > > > assign valid timestamps when writing the data. As you already said,
> > > > you need to provide a custom TimestampExtractor (or you
> > > > WallclockTimestampExtractor if semantic permit) instead of default
> > > > ConsumerRecordTimestampExtractor)
> > > >
> > > >
> > > > - -Matthias
> > > >
> > > > On 10/6/16 7:53 PM, Ali Akhtar wrote:
> > > > > Thanks.
> > > > >
> > > > > I'm encountering a strange issue.
> > > > >
> > > > > If I create messages thru console-producer.sh on a new topic,
> > > > > things work fine.
> > > > >
> > > > > But on the topic that I need to consume, the messages are being
> > > > > produced via the go kafka plugin.
> > > > >
> > > > > On this topic, at first, nothing happens when the stream starts
> > > > > (i.e it doesn't process the messages which are already in there)
> > > > >
> > > > > Then, if I produce new messages, then my exception handler is
> > > > > called with the exception that timestamp is negative.
> > > > >
> > > > > I'm pretty sure I'm on kafka 0.10. I downloaded it 2-3 weeks ago.
> > > > >
> > > > > I'm going to write a new timestamp extractor, but any ideas why
> > > > > nothing happens with the old messages which are in the topic, it
> > > > > only responds if i push new messages to this topic?
> > > > >
> > > > > On Fri, Oct 7, 2016 at 7:25 AM, Matthias J. Sax
> > > > > <ma...@confluent.io> wrote:
> > > > >
> > > > > Sure.
> > > > >
> > > > > Just use #print() or #writeAsText()
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 10/6/16 6:25 PM, Ali Akhtar wrote:
> > > > >>>> What the subject says. For dev, it would be a lot easier if
> > > > >>>> debugging info can be printed to stdin instead of another
> > > > >>>> topic, where it will persist.
> > > > >>>>
> > > > >>>> Any ideas if this is possible?
> > > > >>>>
> > > > >>
> > > > >
> > > > -----BEGIN PGP SIGNATURE-----
> > > > Comment: GPGTools - https://gpgtools.org
> > > >
> > > > iQIcBAEBCgAGBQJX9xO7AAoJECnhiMLycopPLFwQAK76xmPobB5xFdE/qFWm94d5
> > > > 8lj8LahMMBUCG4xWCEs4shvGHjkh2kx2cUZmdgkUSLtEy7HGK6MgmjTa8Smse+0f
> > > > JxQ0f/F8lkMvJKhuw9wmgOKnT/b/U/jRXvUedWvYXp/r7d8Z36DgW9hzO9Yx7Ugq
> > > > qafvCfMdScE4FZEOoU/wJLiRJ3FZZsEobQSbyXR9Vmjs9UYUExUpq02B2N0ItvWz
> > > > 6JPtWNC2PWSlc7j7C7PK0XYeexuE/ZK9yLrM7iuXh6fYeTy3MtBV3pHsDn3d+I2m
> > > > AOUUMyJt4kaSyMX0BzWZVVKZVvdw7rbbGfsZisw67Mko2p+De2KoIEF3yEEvYEit
> > > > Vks00KzGZ1gvGdMDvKzJJRkMVLUxl2R4LxH/TEwXKn5WYQullEHtQ3xb0U7sl2Ae
> > > > UkIgOw0LY3sQj7NL2OOnt9dMS5m0r+FZPlMoYNmN7coAGxo98iKacIqR1tc3f2qs
> > > > NxW2iUvD9lzVaZhaMOY3AjD1Q2G7yyQ+wRdlcZtNkAAID4mNrbyu7MKT7x6paLUf
> > > > OXGjtl8tcMwegSqZtpEZnJFSquu0SuX2UraDWDiz6NaW+TYIM8Qnq3oF9gWDQX+I
> > > > gvtqMiGzxxs4ZW9ysueT+X1MRoPRrnwlXPQ7XVFR6oKMEJrw0W2x8TkyHJiXVdls
> > > > ZBA0KEjx9U8NNf+eiqN5
> > > > =UMbs
> > > > -----END PGP SIGNATURE-----
> > > >
> > >
> >
>

Re: Printing to stdin from KStreams?

Posted by Michael Noll <mi...@confluent.io>.
> Is it possible to have kafka-streams-reset be automatically called during
> development? Something like streams.cleanUp() but which also does reset?

Unfortunately this isn't possible (yet), Ali.  I am also not aware of any
plan to add such a feature in the short-term.



On Fri, Oct 7, 2016 at 1:36 PM, Ali Akhtar <al...@gmail.com> wrote:

> Is it possible to have kafka-streams-reset be automatically called during
> development? Something like streams.cleanUp() but which also does reset?
>
> On Fri, Oct 7, 2016 at 2:45 PM, Michael Noll <mi...@confluent.io> wrote:
>
> > Ali,
> >
> > adding to what Matthias said:
> >
> > Kafka 0.10 changed the message format to add so-called "embedded
> > timestamps" into each Kafka message.  The Java producer included in Kafka
> > 0.10 includes such embedded timestamps into any generated message as
> > expected.
> >
> > However, other clients (like the go kafka plugin you are using) may not
> > have been updated yet to be compatible with the new 0.10 message format.
> > That's the root cause why see these "-1" negative timestamps.   (The same
> > negative timestamp problem also happens if you attempt to read messages
> > that were generated with pre-0.10 versions of Kafka's Java producer.)
> >
> > FYI: Kafka Streams' default timestamp extractor attempts to read those
> new
> > embedded timestamps.  If there are no such embedded timestamps, you run
> > into these "negative timestamps" errors.
> >
> > Now, how to fix your problem?
> >
> > - Fix the root cause: Check if there's a newer version of your Go kafka
> > plugin that generates messages in the new Kafka 0.10 format.  If there is
> > no such version, ask the maintainers for an update. :-)
> >
> > - Work around the problem:  As Matthias said, you can also tell Kafka
> > Streams to not use its default timestamp extractor.  You can fallback to
> > the WallclockTimestampExtractor, though this means your application will
> > not use event-time but processing-time when processing your data, which
> is
> > probably not what you want (but it does prevent the -1 timestamp errors).
> > If your data (generated by the go kafka plugin) *does* contain timestamp
> > information in the message payload, then the better option is to write a
> > custom timestamp extract that inspects each message, extracts the
> timestamp
> > from the payload, and returns it to Kafka Streams.  The Timestamp
> Extractor
> > section in [1] explains how to write a custom one and how to configure
> your
> > app to use it.
> >
> > Hope this helps,
> > Michael
> >
> >
> >
> > [1]
> > http://docs.confluent.io/3.0.1/streams/developer-guide.
> > html#optional-configuration-parameters
> >
> >
> >
> >
> >
> >
> > On Fri, Oct 7, 2016 at 5:17 AM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> > > -----BEGIN PGP SIGNED MESSAGE-----
> > > Hash: SHA512
> > >
> > > If you restart your application, it will resume where is left off
> > > (same as any other Kafka consumer that does use group management and
> > > commits offsets).
> > >
> > > If you want to reprocess data from scratch, you need to reset your
> > > application using bin/kafka-streams-application-reset.sh
> > >
> > > See also
> > > http://docs.confluent.io/3.0.1/streams/developer-guide.
> html#application-
> > > reset-tool
> > >
> > > and
> > > http://www.confluent.io/blog/data-reprocessing-with-kafka-
> streams-resett
> > > ing-a-streams-application/
> > >
> > >
> > > About the timestamp issue: it seems that your Go client does not
> > > assign valid timestamps when writing the data. As you already said,
> > > you need to provide a custom TimestampExtractor (or you
> > > WallclockTimestampExtractor if semantic permit) instead of default
> > > ConsumerRecordTimestampExtractor)
> > >
> > >
> > > - -Matthias
> > >
> > > On 10/6/16 7:53 PM, Ali Akhtar wrote:
> > > > Thanks.
> > > >
> > > > I'm encountering a strange issue.
> > > >
> > > > If I create messages thru console-producer.sh on a new topic,
> > > > things work fine.
> > > >
> > > > But on the topic that I need to consume, the messages are being
> > > > produced via the go kafka plugin.
> > > >
> > > > On this topic, at first, nothing happens when the stream starts
> > > > (i.e it doesn't process the messages which are already in there)
> > > >
> > > > Then, if I produce new messages, then my exception handler is
> > > > called with the exception that timestamp is negative.
> > > >
> > > > I'm pretty sure I'm on kafka 0.10. I downloaded it 2-3 weeks ago.
> > > >
> > > > I'm going to write a new timestamp extractor, but any ideas why
> > > > nothing happens with the old messages which are in the topic, it
> > > > only responds if i push new messages to this topic?
> > > >
> > > > On Fri, Oct 7, 2016 at 7:25 AM, Matthias J. Sax
> > > > <ma...@confluent.io> wrote:
> > > >
> > > > Sure.
> > > >
> > > > Just use #print() or #writeAsText()
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 10/6/16 6:25 PM, Ali Akhtar wrote:
> > > >>>> What the subject says. For dev, it would be a lot easier if
> > > >>>> debugging info can be printed to stdin instead of another
> > > >>>> topic, where it will persist.
> > > >>>>
> > > >>>> Any ideas if this is possible?
> > > >>>>
> > > >>
> > > >
> > > -----BEGIN PGP SIGNATURE-----
> > > Comment: GPGTools - https://gpgtools.org
> > >
> > > iQIcBAEBCgAGBQJX9xO7AAoJECnhiMLycopPLFwQAK76xmPobB5xFdE/qFWm94d5
> > > 8lj8LahMMBUCG4xWCEs4shvGHjkh2kx2cUZmdgkUSLtEy7HGK6MgmjTa8Smse+0f
> > > JxQ0f/F8lkMvJKhuw9wmgOKnT/b/U/jRXvUedWvYXp/r7d8Z36DgW9hzO9Yx7Ugq
> > > qafvCfMdScE4FZEOoU/wJLiRJ3FZZsEobQSbyXR9Vmjs9UYUExUpq02B2N0ItvWz
> > > 6JPtWNC2PWSlc7j7C7PK0XYeexuE/ZK9yLrM7iuXh6fYeTy3MtBV3pHsDn3d+I2m
> > > AOUUMyJt4kaSyMX0BzWZVVKZVvdw7rbbGfsZisw67Mko2p+De2KoIEF3yEEvYEit
> > > Vks00KzGZ1gvGdMDvKzJJRkMVLUxl2R4LxH/TEwXKn5WYQullEHtQ3xb0U7sl2Ae
> > > UkIgOw0LY3sQj7NL2OOnt9dMS5m0r+FZPlMoYNmN7coAGxo98iKacIqR1tc3f2qs
> > > NxW2iUvD9lzVaZhaMOY3AjD1Q2G7yyQ+wRdlcZtNkAAID4mNrbyu7MKT7x6paLUf
> > > OXGjtl8tcMwegSqZtpEZnJFSquu0SuX2UraDWDiz6NaW+TYIM8Qnq3oF9gWDQX+I
> > > gvtqMiGzxxs4ZW9ysueT+X1MRoPRrnwlXPQ7XVFR6oKMEJrw0W2x8TkyHJiXVdls
> > > ZBA0KEjx9U8NNf+eiqN5
> > > =UMbs
> > > -----END PGP SIGNATURE-----
> > >
> >
>

Re: Printing to stdin from KStreams?

Posted by Ali Akhtar <al...@gmail.com>.
Is it possible to have kafka-streams-reset be automatically called during
development? Something like streams.cleanUp() but which also does reset?

On Fri, Oct 7, 2016 at 2:45 PM, Michael Noll <mi...@confluent.io> wrote:

> Ali,
>
> adding to what Matthias said:
>
> Kafka 0.10 changed the message format to add so-called "embedded
> timestamps" into each Kafka message.  The Java producer included in Kafka
> 0.10 includes such embedded timestamps into any generated message as
> expected.
>
> However, other clients (like the go kafka plugin you are using) may not
> have been updated yet to be compatible with the new 0.10 message format.
> That's the root cause why see these "-1" negative timestamps.   (The same
> negative timestamp problem also happens if you attempt to read messages
> that were generated with pre-0.10 versions of Kafka's Java producer.)
>
> FYI: Kafka Streams' default timestamp extractor attempts to read those new
> embedded timestamps.  If there are no such embedded timestamps, you run
> into these "negative timestamps" errors.
>
> Now, how to fix your problem?
>
> - Fix the root cause: Check if there's a newer version of your Go kafka
> plugin that generates messages in the new Kafka 0.10 format.  If there is
> no such version, ask the maintainers for an update. :-)
>
> - Work around the problem:  As Matthias said, you can also tell Kafka
> Streams to not use its default timestamp extractor.  You can fallback to
> the WallclockTimestampExtractor, though this means your application will
> not use event-time but processing-time when processing your data, which is
> probably not what you want (but it does prevent the -1 timestamp errors).
> If your data (generated by the go kafka plugin) *does* contain timestamp
> information in the message payload, then the better option is to write a
> custom timestamp extract that inspects each message, extracts the timestamp
> from the payload, and returns it to Kafka Streams.  The Timestamp Extractor
> section in [1] explains how to write a custom one and how to configure your
> app to use it.
>
> Hope this helps,
> Michael
>
>
>
> [1]
> http://docs.confluent.io/3.0.1/streams/developer-guide.
> html#optional-configuration-parameters
>
>
>
>
>
>
> On Fri, Oct 7, 2016 at 5:17 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > -----BEGIN PGP SIGNED MESSAGE-----
> > Hash: SHA512
> >
> > If you restart your application, it will resume where is left off
> > (same as any other Kafka consumer that does use group management and
> > commits offsets).
> >
> > If you want to reprocess data from scratch, you need to reset your
> > application using bin/kafka-streams-application-reset.sh
> >
> > See also
> > http://docs.confluent.io/3.0.1/streams/developer-guide.html#application-
> > reset-tool
> >
> > and
> > http://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resett
> > ing-a-streams-application/
> >
> >
> > About the timestamp issue: it seems that your Go client does not
> > assign valid timestamps when writing the data. As you already said,
> > you need to provide a custom TimestampExtractor (or you
> > WallclockTimestampExtractor if semantic permit) instead of default
> > ConsumerRecordTimestampExtractor)
> >
> >
> > - -Matthias
> >
> > On 10/6/16 7:53 PM, Ali Akhtar wrote:
> > > Thanks.
> > >
> > > I'm encountering a strange issue.
> > >
> > > If I create messages thru console-producer.sh on a new topic,
> > > things work fine.
> > >
> > > But on the topic that I need to consume, the messages are being
> > > produced via the go kafka plugin.
> > >
> > > On this topic, at first, nothing happens when the stream starts
> > > (i.e it doesn't process the messages which are already in there)
> > >
> > > Then, if I produce new messages, then my exception handler is
> > > called with the exception that timestamp is negative.
> > >
> > > I'm pretty sure I'm on kafka 0.10. I downloaded it 2-3 weeks ago.
> > >
> > > I'm going to write a new timestamp extractor, but any ideas why
> > > nothing happens with the old messages which are in the topic, it
> > > only responds if i push new messages to this topic?
> > >
> > > On Fri, Oct 7, 2016 at 7:25 AM, Matthias J. Sax
> > > <ma...@confluent.io> wrote:
> > >
> > > Sure.
> > >
> > > Just use #print() or #writeAsText()
> > >
> > >
> > > -Matthias
> > >
> > > On 10/6/16 6:25 PM, Ali Akhtar wrote:
> > >>>> What the subject says. For dev, it would be a lot easier if
> > >>>> debugging info can be printed to stdin instead of another
> > >>>> topic, where it will persist.
> > >>>>
> > >>>> Any ideas if this is possible?
> > >>>>
> > >>
> > >
> > -----BEGIN PGP SIGNATURE-----
> > Comment: GPGTools - https://gpgtools.org
> >
> > iQIcBAEBCgAGBQJX9xO7AAoJECnhiMLycopPLFwQAK76xmPobB5xFdE/qFWm94d5
> > 8lj8LahMMBUCG4xWCEs4shvGHjkh2kx2cUZmdgkUSLtEy7HGK6MgmjTa8Smse+0f
> > JxQ0f/F8lkMvJKhuw9wmgOKnT/b/U/jRXvUedWvYXp/r7d8Z36DgW9hzO9Yx7Ugq
> > qafvCfMdScE4FZEOoU/wJLiRJ3FZZsEobQSbyXR9Vmjs9UYUExUpq02B2N0ItvWz
> > 6JPtWNC2PWSlc7j7C7PK0XYeexuE/ZK9yLrM7iuXh6fYeTy3MtBV3pHsDn3d+I2m
> > AOUUMyJt4kaSyMX0BzWZVVKZVvdw7rbbGfsZisw67Mko2p+De2KoIEF3yEEvYEit
> > Vks00KzGZ1gvGdMDvKzJJRkMVLUxl2R4LxH/TEwXKn5WYQullEHtQ3xb0U7sl2Ae
> > UkIgOw0LY3sQj7NL2OOnt9dMS5m0r+FZPlMoYNmN7coAGxo98iKacIqR1tc3f2qs
> > NxW2iUvD9lzVaZhaMOY3AjD1Q2G7yyQ+wRdlcZtNkAAID4mNrbyu7MKT7x6paLUf
> > OXGjtl8tcMwegSqZtpEZnJFSquu0SuX2UraDWDiz6NaW+TYIM8Qnq3oF9gWDQX+I
> > gvtqMiGzxxs4ZW9ysueT+X1MRoPRrnwlXPQ7XVFR6oKMEJrw0W2x8TkyHJiXVdls
> > ZBA0KEjx9U8NNf+eiqN5
> > =UMbs
> > -----END PGP SIGNATURE-----
> >
>

Re: Printing to stdin from KStreams?

Posted by Michael Noll <mi...@confluent.io>.
Ali,

adding to what Matthias said:

Kafka 0.10 changed the message format to add so-called "embedded
timestamps" into each Kafka message.  The Java producer included in Kafka
0.10 includes such embedded timestamps into any generated message as
expected.

However, other clients (like the go kafka plugin you are using) may not
have been updated yet to be compatible with the new 0.10 message format.
That's the root cause why see these "-1" negative timestamps.   (The same
negative timestamp problem also happens if you attempt to read messages
that were generated with pre-0.10 versions of Kafka's Java producer.)

FYI: Kafka Streams' default timestamp extractor attempts to read those new
embedded timestamps.  If there are no such embedded timestamps, you run
into these "negative timestamps" errors.

Now, how to fix your problem?

- Fix the root cause: Check if there's a newer version of your Go kafka
plugin that generates messages in the new Kafka 0.10 format.  If there is
no such version, ask the maintainers for an update. :-)

- Work around the problem:  As Matthias said, you can also tell Kafka
Streams to not use its default timestamp extractor.  You can fallback to
the WallclockTimestampExtractor, though this means your application will
not use event-time but processing-time when processing your data, which is
probably not what you want (but it does prevent the -1 timestamp errors).
If your data (generated by the go kafka plugin) *does* contain timestamp
information in the message payload, then the better option is to write a
custom timestamp extract that inspects each message, extracts the timestamp
from the payload, and returns it to Kafka Streams.  The Timestamp Extractor
section in [1] explains how to write a custom one and how to configure your
app to use it.

Hope this helps,
Michael



[1]
http://docs.confluent.io/3.0.1/streams/developer-guide.html#optional-configuration-parameters






On Fri, Oct 7, 2016 at 5:17 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> If you restart your application, it will resume where is left off
> (same as any other Kafka consumer that does use group management and
> commits offsets).
>
> If you want to reprocess data from scratch, you need to reset your
> application using bin/kafka-streams-application-reset.sh
>
> See also
> http://docs.confluent.io/3.0.1/streams/developer-guide.html#application-
> reset-tool
>
> and
> http://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resett
> ing-a-streams-application/
>
>
> About the timestamp issue: it seems that your Go client does not
> assign valid timestamps when writing the data. As you already said,
> you need to provide a custom TimestampExtractor (or you
> WallclockTimestampExtractor if semantic permit) instead of default
> ConsumerRecordTimestampExtractor)
>
>
> - -Matthias
>
> On 10/6/16 7:53 PM, Ali Akhtar wrote:
> > Thanks.
> >
> > I'm encountering a strange issue.
> >
> > If I create messages thru console-producer.sh on a new topic,
> > things work fine.
> >
> > But on the topic that I need to consume, the messages are being
> > produced via the go kafka plugin.
> >
> > On this topic, at first, nothing happens when the stream starts
> > (i.e it doesn't process the messages which are already in there)
> >
> > Then, if I produce new messages, then my exception handler is
> > called with the exception that timestamp is negative.
> >
> > I'm pretty sure I'm on kafka 0.10. I downloaded it 2-3 weeks ago.
> >
> > I'm going to write a new timestamp extractor, but any ideas why
> > nothing happens with the old messages which are in the topic, it
> > only responds if i push new messages to this topic?
> >
> > On Fri, Oct 7, 2016 at 7:25 AM, Matthias J. Sax
> > <ma...@confluent.io> wrote:
> >
> > Sure.
> >
> > Just use #print() or #writeAsText()
> >
> >
> > -Matthias
> >
> > On 10/6/16 6:25 PM, Ali Akhtar wrote:
> >>>> What the subject says. For dev, it would be a lot easier if
> >>>> debugging info can be printed to stdin instead of another
> >>>> topic, where it will persist.
> >>>>
> >>>> Any ideas if this is possible?
> >>>>
> >>
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJX9xO7AAoJECnhiMLycopPLFwQAK76xmPobB5xFdE/qFWm94d5
> 8lj8LahMMBUCG4xWCEs4shvGHjkh2kx2cUZmdgkUSLtEy7HGK6MgmjTa8Smse+0f
> JxQ0f/F8lkMvJKhuw9wmgOKnT/b/U/jRXvUedWvYXp/r7d8Z36DgW9hzO9Yx7Ugq
> qafvCfMdScE4FZEOoU/wJLiRJ3FZZsEobQSbyXR9Vmjs9UYUExUpq02B2N0ItvWz
> 6JPtWNC2PWSlc7j7C7PK0XYeexuE/ZK9yLrM7iuXh6fYeTy3MtBV3pHsDn3d+I2m
> AOUUMyJt4kaSyMX0BzWZVVKZVvdw7rbbGfsZisw67Mko2p+De2KoIEF3yEEvYEit
> Vks00KzGZ1gvGdMDvKzJJRkMVLUxl2R4LxH/TEwXKn5WYQullEHtQ3xb0U7sl2Ae
> UkIgOw0LY3sQj7NL2OOnt9dMS5m0r+FZPlMoYNmN7coAGxo98iKacIqR1tc3f2qs
> NxW2iUvD9lzVaZhaMOY3AjD1Q2G7yyQ+wRdlcZtNkAAID4mNrbyu7MKT7x6paLUf
> OXGjtl8tcMwegSqZtpEZnJFSquu0SuX2UraDWDiz6NaW+TYIM8Qnq3oF9gWDQX+I
> gvtqMiGzxxs4ZW9ysueT+X1MRoPRrnwlXPQ7XVFR6oKMEJrw0W2x8TkyHJiXVdls
> ZBA0KEjx9U8NNf+eiqN5
> =UMbs
> -----END PGP SIGNATURE-----
>

Re: Printing to stdin from KStreams?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

If you restart your application, it will resume where is left off
(same as any other Kafka consumer that does use group management and
commits offsets).

If you want to reprocess data from scratch, you need to reset your
application using bin/kafka-streams-application-reset.sh

See also
http://docs.confluent.io/3.0.1/streams/developer-guide.html#application-
reset-tool

and
http://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resett
ing-a-streams-application/


About the timestamp issue: it seems that your Go client does not
assign valid timestamps when writing the data. As you already said,
you need to provide a custom TimestampExtractor (or you
WallclockTimestampExtractor if semantic permit) instead of default
ConsumerRecordTimestampExtractor)


- -Matthias

On 10/6/16 7:53 PM, Ali Akhtar wrote:
> Thanks.
> 
> I'm encountering a strange issue.
> 
> If I create messages thru console-producer.sh on a new topic,
> things work fine.
> 
> But on the topic that I need to consume, the messages are being
> produced via the go kafka plugin.
> 
> On this topic, at first, nothing happens when the stream starts
> (i.e it doesn't process the messages which are already in there)
> 
> Then, if I produce new messages, then my exception handler is
> called with the exception that timestamp is negative.
> 
> I'm pretty sure I'm on kafka 0.10. I downloaded it 2-3 weeks ago.
> 
> I'm going to write a new timestamp extractor, but any ideas why
> nothing happens with the old messages which are in the topic, it
> only responds if i push new messages to this topic?
> 
> On Fri, Oct 7, 2016 at 7:25 AM, Matthias J. Sax
> <ma...@confluent.io> wrote:
> 
> Sure.
> 
> Just use #print() or #writeAsText()
> 
> 
> -Matthias
> 
> On 10/6/16 6:25 PM, Ali Akhtar wrote:
>>>> What the subject says. For dev, it would be a lot easier if 
>>>> debugging info can be printed to stdin instead of another
>>>> topic, where it will persist.
>>>> 
>>>> Any ideas if this is possible?
>>>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJX9xO7AAoJECnhiMLycopPLFwQAK76xmPobB5xFdE/qFWm94d5
8lj8LahMMBUCG4xWCEs4shvGHjkh2kx2cUZmdgkUSLtEy7HGK6MgmjTa8Smse+0f
JxQ0f/F8lkMvJKhuw9wmgOKnT/b/U/jRXvUedWvYXp/r7d8Z36DgW9hzO9Yx7Ugq
qafvCfMdScE4FZEOoU/wJLiRJ3FZZsEobQSbyXR9Vmjs9UYUExUpq02B2N0ItvWz
6JPtWNC2PWSlc7j7C7PK0XYeexuE/ZK9yLrM7iuXh6fYeTy3MtBV3pHsDn3d+I2m
AOUUMyJt4kaSyMX0BzWZVVKZVvdw7rbbGfsZisw67Mko2p+De2KoIEF3yEEvYEit
Vks00KzGZ1gvGdMDvKzJJRkMVLUxl2R4LxH/TEwXKn5WYQullEHtQ3xb0U7sl2Ae
UkIgOw0LY3sQj7NL2OOnt9dMS5m0r+FZPlMoYNmN7coAGxo98iKacIqR1tc3f2qs
NxW2iUvD9lzVaZhaMOY3AjD1Q2G7yyQ+wRdlcZtNkAAID4mNrbyu7MKT7x6paLUf
OXGjtl8tcMwegSqZtpEZnJFSquu0SuX2UraDWDiz6NaW+TYIM8Qnq3oF9gWDQX+I
gvtqMiGzxxs4ZW9ysueT+X1MRoPRrnwlXPQ7XVFR6oKMEJrw0W2x8TkyHJiXVdls
ZBA0KEjx9U8NNf+eiqN5
=UMbs
-----END PGP SIGNATURE-----

Re: Printing to stdin from KStreams?

Posted by Ali Akhtar <al...@gmail.com>.
Thanks.

I'm encountering a strange issue.

If I create messages thru console-producer.sh on a new topic, things work
fine.

But on the topic that I need to consume, the messages are being produced
via the go kafka plugin.

On this topic, at first, nothing happens when the stream starts (i.e it
doesn't process the messages which are already in there)

Then, if I produce new messages, then my exception handler is called with
the exception that timestamp is negative.

I'm pretty sure I'm on kafka 0.10. I downloaded it 2-3 weeks ago.

I'm going to write a new timestamp extractor, but any ideas why nothing
happens with the old messages which are in the topic, it only responds if i
push new messages to this topic?

On Fri, Oct 7, 2016 at 7:25 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> Sure.
>
> Just use #print() or #writeAsText()
>
>
> - -Matthias
>
> On 10/6/16 6:25 PM, Ali Akhtar wrote:
> > What the subject says. For dev, it would be a lot easier if
> > debugging info can be printed to stdin instead of another topic,
> > where it will persist.
> >
> > Any ideas if this is possible?
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJX9weuAAoJECnhiMLycopPtbIP/R2hJgcfOtPqlRqbuQ1590D8
> zznMVi9TPPZM4RiGInuUFpefT3Emwxh0lbkNziEQKh+CmPjFqL6nUqssK+2G4dgf
> Aer3s5lMa0gz/AxhOwYi97VjptCGT21zxop5gbgWi4FlNapncHAFiziS5WxDkJgf
> LqJR6/isexI5Av5pQQkXvxQ7/S7GWFQKHygrqKJqGP6N5+kFtpPjJrzBie3VLuCN
> nMkgInWPFMoLNvLUCHGHIClPJe1Go+/4guXUuy1xDWHjeEHnT26+KnpOlTfuiBU6
> rNWoJqFWUb4je+QVQt/9izCuaBQQhPp4HaH2e5F9DJd72VOjg3e0CB1uRYVf4qCM
> zhXhiXXkDKfunxhMHcPFTXITPXClwRxhTJnDuYWOFaMPyrwDeXz3c3hMEaJlNRDe
> PyIvsh3OWcA9IxGjsvHETtFK//ujBDn3zh9t/lkQlj36s+h/Sz+NMYMrFh1ipPbR
> yd/YwYUNnRp5f8udmWOktUvx1RUc8vjwA8VXCppiIiCLUhMF8HH8lFNRzOgTz+PF
> kV//LXSpovfzlPDDfuK1WijgjG1su/IWSpOGBqOM56fke9/xtoY5T5iqYVyvIsDI
> I3I4M8xOSZHRm4ye+OX+qa9Cwf4h0/LcdWJkbQT1TgNvYf0dOTZDKsLvcy/tOk/U
> 2SXFUX7k+6Es38vhcfXU
> =ypHP
> -----END PGP SIGNATURE-----
>

Re: Printing to stdin from KStreams?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Sure.

Just use #print() or #writeAsText()


- -Matthias

On 10/6/16 6:25 PM, Ali Akhtar wrote:
> What the subject says. For dev, it would be a lot easier if
> debugging info can be printed to stdin instead of another topic,
> where it will persist.
> 
> Any ideas if this is possible?
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJX9weuAAoJECnhiMLycopPtbIP/R2hJgcfOtPqlRqbuQ1590D8
zznMVi9TPPZM4RiGInuUFpefT3Emwxh0lbkNziEQKh+CmPjFqL6nUqssK+2G4dgf
Aer3s5lMa0gz/AxhOwYi97VjptCGT21zxop5gbgWi4FlNapncHAFiziS5WxDkJgf
LqJR6/isexI5Av5pQQkXvxQ7/S7GWFQKHygrqKJqGP6N5+kFtpPjJrzBie3VLuCN
nMkgInWPFMoLNvLUCHGHIClPJe1Go+/4guXUuy1xDWHjeEHnT26+KnpOlTfuiBU6
rNWoJqFWUb4je+QVQt/9izCuaBQQhPp4HaH2e5F9DJd72VOjg3e0CB1uRYVf4qCM
zhXhiXXkDKfunxhMHcPFTXITPXClwRxhTJnDuYWOFaMPyrwDeXz3c3hMEaJlNRDe
PyIvsh3OWcA9IxGjsvHETtFK//ujBDn3zh9t/lkQlj36s+h/Sz+NMYMrFh1ipPbR
yd/YwYUNnRp5f8udmWOktUvx1RUc8vjwA8VXCppiIiCLUhMF8HH8lFNRzOgTz+PF
kV//LXSpovfzlPDDfuK1WijgjG1su/IWSpOGBqOM56fke9/xtoY5T5iqYVyvIsDI
I3I4M8xOSZHRm4ye+OX+qa9Cwf4h0/LcdWJkbQT1TgNvYf0dOTZDKsLvcy/tOk/U
2SXFUX7k+6Es38vhcfXU
=ypHP
-----END PGP SIGNATURE-----