You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Debasish Ghosh <gh...@gmail.com> on 2016/10/28 08:07:20 UTC

Problem with timestamp in Producer

Hello -

I am a beginner in Kafka .. with my first Kafka streams application ..

I have a streams application that reads from a topic, does some
transformation on the data and writes to another topic. The record that I
manipulate is a CSV record.

It runs fine when I run it on a local Kafka instance.

However when I run it on an AWS cluster, I get the following exception when
I try to produce the transformed record into the target topic.

Exception in thread "StreamThread-1" java.lang.IllegalArgumentException:
Invalid timestamp -1
at
org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)
at
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72)
at
org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at
org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at
org.apache.kafka.streams.kstream.internals.KStreamPassThrough$KStreamPassThroughProcessor.process(KStreamPassThrough.java:34)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at
org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:351)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:192)
at
org.apache.kafka.streams.kstream.internals.KStreamBranch$KStreamBranchProcessor.process(KStreamBranch.java:46)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at
org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at
org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)

Looks like the timestamp passed to the ProducerRecord is -1, though I am
not passing any timestamp explicitly. I am not sure why this happens. But I
see that the Javadoc for ProducerRecord says the following ..

The record also has an associated timestamp. If the user did not provide a
> timestamp, the producer will stamp the record with its current time. The
> timestamp eventually used by Kafka depends on the timestamp type configured
> for the topic.
> If the topic is configured to use CreateTime, the timestamp in the
> producer record will be used by the broker.
> If the topic is configured to use LogAppendTime, the timestamp in the
> producer record will be overwritten by the broker with the broker local
> time when it appends the message to its log.
> In either of the cases above, the timestamp that has actually been used
> will be returned to user in RecordMetadata


   1. Will this problem be solved if I configure the topic with
   LogAppendTime or CreateTime explicitly ?
   2. What is the default setting of this property in a newly created topic
   ?
   3. How do I change it (what is the name of the property to be set) ?
   4. Any idea why I face this problem in the cluster mode but not in the
   local mode ?

BTW I am using 0.10.1.

Any help / pointer will be appreciated ?

regards.

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Re: Problem with timestamp in Producer

Posted by Debasish Ghosh <gh...@gmail.com>.
Thanks Matthias again for all the suggestions ..

On Monday 31 October 2016, Matthias J. Sax <ma...@confluent.io> wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> Just one comment to add:
>
> > it produces messages based
> >> on 0.9.0 which obviously cannot be consumed by 0.10.0 brokers.
>
> Strictly speaking, this is not correct. Brokers are compatible to
> older versions of clients (not the other way round though). For 0.9
> client writing to 0.10 brokers, it only happens that the client does
> not set the timestamp field, but this is not an issue by itself. If
> you consume a topic like this, this will also work.
>
> The issue you observed is a special combination of different things. A
> situation as describe about, plus Streams property. In Streams, input
> record timestamps determine output record timestamps and the used 0.10
> producer requires a valid TS value. Thus, a Streams application fails
> for this case.
>
> Btw: if you still want to use 0.9 producer client, you could also
> change broker setting from "create time" to "append time". Using
> append time, regardless of the producer behavior, a valid TS will be
> set for all (newly) written records. Keep in mind, that this no global
> but a per-topic broker setting.
>
> - -Matthias
>
>
> On 10/30/16 1:15 AM, Debasish Ghosh wrote:
> > I think I found out what happened .. I was installing Kafka under
> > DC/OS on AWS and following this doc
> > https://dcos.io/docs/1.8/usage/tutorials/kafka/ for kafka
> > installation. This works fine and installs kafka version 0.10.0.
> >
> > But in the example where it shows how to produce and consume
> > messages, this doc says the following ..
> >
> > core@ip-10-0-6-153 ~ $ docker run -it mesosphere/kafka-client
> >> root@7d0aed75e582:/bin# echo "Hello, World." |
> >> ./kafka-console-producer.sh --broker-list KAFKA_ADDRESS:PORT
> >> --topic topic1
> >
> >
> > The problem is the docker run of kafka-client pulls in version
> > 0.9.0. When I use kafka-console-producer from this client, it
> > produces messages based on 0.9.0 which obviously cannot be consumed
> > by 0.10.0 brokers.
> >
> > Thanks for pointing in this direction .. I think it will be fixed
> > if I install a 0.10.0 client in its place.
> >
> > regards.
> >
> > On Sunday 30 October 2016, Matthias J. Sax <matthias@confluent.io
> <javascript:;>>
> > wrote:
> >
> > The simplest way should be to check the java classpath.
> >
> > Insert an
> >
> > echo $CLASSPATH
> >
> > at the end of bin/kafka-run-class.sh
> >
> > Than run bin/kafka-console-producer.sh with no argument.
> >
> > You should see the classpath be printed out. Look for
> > 'kafka-clients-XXX.jar' -- XXX will be the version number.
> >
> >
> > -Matthias
> >
> >
> > On 10/29/16 12:11 AM, Debasish Ghosh wrote:
> >>>> Hello Mathias -
> >>>>
> >>>> Regarding ..
> >>>>
> >>>> In case you do have 0.10 brokers, it might however happen,
> >>>> that bin/kafka-console-producer.sh
> >>>>> does use 0.9 producer.
> >>>>
> >>>>
> >>>> How can I check this ?
> >>>>
> >>>> Thanks!
> >>>>
> >>>> On Sat, Oct 29, 2016 at 12:23 PM, Debasish Ghosh
> >>>> <ghosh.debasish@gmail.com <javascript:;> <javascript:;>> wrote:
> >>>>
> >>>>> I agree .. the problem is DC/OS still ships the older
> >>>>> version. Let me check if I can upgrade this ..
> >>>>>
> >>>>> Thanks!
> >>>>>
> >>>>> On Sat, Oct 29, 2016 at 12:21 PM, Matthias J. Sax
> >>>>> <matthias@confluent.io <javascript:;> <javascript:;>> wrote:
> >>>>>
> >>>> Btw: I would highly recommend to use Kafka 0.10.1 -- there
> >>>> are many new Streams feature and usability improvements and
> >>>> bug fixes.
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 10/28/16 11:42 PM, Matthias J. Sax wrote:
> >>>>>>>> That sounds reasonable. However, I am wondering how
> >>>>>>>> your Streams application can connect to 0.9 broker in
> >>>>>>>> the first place. Streams internally uses standard
> >>>>>>>> Kafka clients, and those are not backward compatible.
> >>>>>>>> Thus, the 0.10 Streams clients should not be able to
> >>>>>>>> connect to 0.9 broker.
> >>>>>>>>
> >>>>>>>> In case you do have 0.10 brokers, it might however
> >>>>>>>> happen, that bin/kafka-console-producer.sh does use
> >>>>>>>> 0.9 producer. Broker are backward compatible, thus, a
> >>>>>>>> 0.9 producer can write to 0.10 broker (and in this
> >>>>>>>> case record TS would be invalid). While I assume that
> >>>>>>>> in you local environment you are using 0.10
> >>>>>>>> bin/kafka-console-produer.sh and thus all works
> >>>>>>>> fine.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 10/28/16 11:00 PM, Debasish Ghosh wrote:
> >>>>>>>>> Hello Mathias -
> >>>>>>>>
> >>>>>>>>> Thanks a lot for the response. I think what may be
> >>>>>>>>> happening is a version mismatch between the
> >>>>>>>>> development & deployment versions of Kafka. The
> >>>>>>>>> Kafka streams application that I developed uses
> >>>>>>>>> 0.10.0 based libraries. And my local environment
> >>>>>>>>> contains a server installation of the same version.
> >>>>>>>>> Hence it works ok in my local environment.
> >>>>>>>>
> >>>>>>>>> But the DC/OS 1.8 deploys 0.9.0 of Kafka when I
> >>>>>>>>> install the service through DC/OS cli. And I use
> >>>>>>>>> this version to load records into the input topic.
> >>>>>>>>> And try to consume using the deployed streams
> >>>>>>>>> application which I developed using 0.10.0. Hence
> >>>>>>>>> the producer did not put the timestamp while the
> >>>>>>>>> consumer expects to have one.
> >>>>>>>>
> >>>>>>>>> I need to check if 0.10.x is available for DC/OS
> >>>>>>>>> ..
> >>>>>>>>
> >>>>>>>>> Thanks again for your suggestions.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>> On Sat, Oct 29, 2016 at 3:23 AM, Matthias J. Sax
> >>>>>>>>> <matthias@confluent.io <javascript:;> <javascript:;>> wrote:
> >>>>>>>>
> >>>>>>>>> Hey,
> >>>>>>>>
> >>>>>>>>> we just added a new FAQ entry for upcoming CP 3.2
> >>>>>>>>> release that answers your question. I just c&p it
> >>>>>>>>> here. More concrete answer below.
> >>>>>>>>
> >>>>>>>>>>>> If you get an exception similar to the one
> >>>>>>>>>>>> shown below, there are multiple possible
> >>>>>>>>>>>> causes:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Exception in thread "StreamThread-1"
> >>>>>>>>>>>> java.lang.IllegalArgumentException: Invalid
> >>>>>>>>>>>> timestamp -1 at
> >>>>>>>>>>>>
> >>>>>>>>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>(Pro
> duc
> >
> >>>>>>>>>>>>
> erRe
> >>>>
> >>>>>>>>>>>>
> > c
> >>>>>>>>
> >>>>>>>>>>>>
> >>>> ord
> >>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>> .java:60)
> >>>>>>>>>>>>
> >>>>>>>>>>>> This error means that the timestamp extractor
> >>>>>>>>>>>> of your Kafka Streams application failed to
> >>>>>>>>>>>> extract a valid timestamp from a record.
> >>>>>>>>>>>> Typically, this points to a problem with the
> >>>>>>>>>>>> record (e.g., the record does not contain a
> >>>>>>>>>>>> timestamp at all), but it could also indicate
> >>>>>>>>>>>> a problem or bug in the timestamp extractor
> >>>>>>>>>>>> used by the application.
> >>>>>>>>>>>>
> >>>>>>>>>>>> When does a record not contain a valid
> >>>>>>>>>>>> timestamp:
> >>>>>>>>>>>>
> >>>>>>>>>>>> If you are using the default
> >>>>>>>>>>>> ConsumerRecordTimestampExtractor, it is most
> >>>>>>>>>>>> likely that your records do not carry an
> >>>>>>>>>>>> embedded timestamp (embedded record
> >>>>>>>>>>>> timestamps got introduced in Kafka's message
> >>>>>>>>>>>> format in Kafka 0.10). This might happen, if
> >>>>>>>>>>>> you consume a topic that is written by old
> >>>>>>>>>>>> Kafka producer clients (ie, version 0.9 or
> >>>>>>>>>>>> earlier) or third party producer clients. A
> >>>>>>>>>>>> common situation where this may happen is
> >>>>>>>>>>>> after upgrading your Kafka cluster from 0.9
> >>>>>>>>>>>> to 0.10, where all the data that was
> >>>>>>>>>>>> generated with 0.9 is not compatible with the
> >>>>>>>>>>>> 0.10 message format. If you are using a
> >>>>>>>>>>>> custom timestamp extractor, make sure that
> >>>>>>>>>>>> your extractor is robust to missing
> >>>>>>>>>>>> timestamps in your records. For example, you
> >>>>>>>>>>>> can return a default or estimated timestamp
> >>>>>>>>>>>> if you cannot extract a valid timestamp
> >>>>>>>>>>>> (maybe the timstamp field in your data is
> >>>>>>>>>>>> just missing). You can also switch to
> >>>>>>>>>>>> processing time semantics via
> >>>>>>>>>>>> WallclockTimestampExtractor; whether such a
> >>>>>>>>>>>> fallback is an appropriate response to this
> >>>>>>>>>>>> situation depends on your use case. However,
> >>>>>>>>>>>> as a first step you should identify and fix
> >>>>>>>>>>>> the root cause for why such problematic
> >>>>>>>>>>>> records were written to Kafka in the first
> >>>>>>>>>>>> place. In a second step you may consider
> >>>>>>>>>>>> applying workarounds (as described above)
> >>>>>>>>>>>> when dealing with such records (for example,
> >>>>>>>>>>>> if you need to process those records after
> >>>>>>>>>>>> all). Another option is to regenerate the
> >>>>>>>>>>>> records with correct timestamps and write
> >>>>>>>>>>>> them to a new Kafka topic.
> >>>>>>>>>>>>
> >>>>>>>>>>>> When the timestamp extractor causes the
> >>>>>>>>>>>> problem:
> >>>>>>>>>>>>
> >>>>>>>>>>>> In this situation you should debug and fix
> >>>>>>>>>>>> the erroneous extractor. If the extractor is
> >>>>>>>>>>>> built into Kafka, please report the bug to
> >>>>>>>>>>>> the Kafka developer mailing list at
> >>>>>>>>>>>> dev@kafka.apache.org <javascript:;> <javascript:;> (see
> >>>>>>>>>>>> instructions
> >>>>>>>>>>>> http://kafka.apache.org/contact); in the
> >>>>>>>>>>>> meantime, you may write a custom timestamp
> >>>>>>>>>>>> extractor that fixes the problem and
> >>>>>>>>>>>> configure your application to use that
> >>>>>>>>>>>> extractor for the time being.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>> To address you questions more concretely:
> >>>>>>>>
> >>>>>>>>> 1. Yes an no: Yes, for any new data you write to
> >>>>>>>>> you topic. No, for any already written data that
> >>>>>>>>> does not have a valid timestamp set 2. Default is
> >>>>>>>>> creating time 3. Config parameter
> >>>>>>>>> "message.timestamp.type") It's a broker side per
> >>>>>>>>> topic setting (however, be aware that Java
> >>>>>>>>> KafkaProducer does verify the timestamp locally
> >>>>>>>>> before sending the message to the broker, thus on
> >>>>>>>>> -1 there will be the client side exception you did
> >>>>>>>>> observe( 4. I assume that you do consumer different
> >>>>>>>>> topic with different TS fields in you records.
> >>>>>>>>
> >>>>>>>>> Also have a look at:
> >>>>>>>>> http://docs.confluent.io/current/streams/concepts.html#time
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >
> >>>>>>>>>
> - -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>> On 10/28/16 5:42 AM, Debasish Ghosh wrote:
> >>>>>>>>>>>> I am actually using 0.10.0 and NOT 0.10.1 as
> >>>>>>>>>>>> I mentioned in the last mail. And I am using
> >>>>>>>>>>>> Kafka within a DC/OS cluster under AWS.
> >>>>>>>>>>>>
> >>>>>>>>>>>> The version that I mentioned works ok is on
> >>>>>>>>>>>> my local machine using a local Kafka
> >>>>>>>>>>>> installation. And it works for both single
> >>>>>>>>>>>> broker and multi broker scenario.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks.
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Fri, Oct 28, 2016 at 1:37 PM, Debasish
> >>>>>>>>>>>> Ghosh <ghosh.debasish@gmail.com <javascript:;>
> >>>>>>>>>>>> <javascript:;>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hello -
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I am a beginner in Kafka .. with my first
> >>>>>>>>>>>>> Kafka streams application ..
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I have a streams application that reads
> >>>>>>>>>>>>> from a topic, does some transformation on
> >>>>>>>>>>>>> the data and writes to another topic. The
> >>>>>>>>>>>>> record that I manipulate is a CSV record.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> It runs fine when I run it on a local
> >>>>>>>>>>>>> Kafka instance.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> However when I run it on an AWS cluster, I
> >>>>>>>>>>>>> get the following exception when I try to
> >>>>>>>>>>>>> produce the transformed record into the
> >>>>>>>>>>>>> target topic.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Exception in thread "StreamThread-1"
> >>>>>>>>>>>>> java.lang.IllegalArgumentException:
> >>>>>>>>>>>>> Invalid timestamp -1 at
> >>>>>>>>>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >
> >>>>>>>>>>>>>
> (ProducerRecord.java:60) at
> >>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.SinkNode.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >
> >>>>>>>>>>>>>
> process(SinkNode.java:72) at
> >>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> StreamTask.forward(StreamTask.java:338) at
> >>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>
> >>>>>>>>>>>>>
> > at
> >>>>>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValue
> s$
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>
> >>>>>>>>>>>>>
> >>>>
> >>>>>>>>>>>>>
> >
> >>>>>>>>>>>>>
> KStreamMapProcessor.process(KStreamMapValues.java:42) at
> >>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode
> .pr
> >
> >>>>>>>>>>>>>
> oces
> >>>>
> >>>>>>>>>>>>>
> > s
> >>>>>>>>
> >>>>>>>>>>>>>
> >>>> (
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>> ProcessorNode.java:68)
> >>>>>>>>>>>>> at
> >>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> StreamTask.forward(StreamTask.java:338) at
> >>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>
> >>>>>>>>>>>>>
> > at
> >>>>>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamPassThro
> ugh
> >
> >>>>>>>>>>>>>
> $
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>
> >>>>>>>>>>>>>
> >>>>
> >>>>>>>>>>>>>
> > KStreamPassThroughProcessor.process(KStreamPassThrough.java:34)
> >>>>>>>>>>>>> at
> >>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode
> .pr
> >
> >>>>>>>>>>>>>
> oces
> >>>>
> >>>>>>>>>>>>>
> > s
> >>>>>>>>
> >>>>>>>>>>>>>
> >>>> (
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>> ProcessorNode.java:68)
> >>>>>>>>>>>>> at
> >>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> StreamTask.forward(StreamTask.java:351) at
> >>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> ProcessorContextImpl.forward(ProcessorContextImpl.java:192)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>
> >>>>>>>>>>>>>
> > at
> >>>>>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamBranch$
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >
> >>>>>>>>>>>>>
> KStreamBranchProcessor.process(KStreamBranch.java:46) at
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode
> .pr
> >
> >>>>>>>>>>>>>
> oces
> >>>>
> >>>>>>>>>>>>>
> > s
> >>>>>>>>
> >>>>>>>>>>>>>
> >>>> (
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>> ProcessorNode.java:68)
> >>>>>>>>>>>>> at
> >>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> StreamTask.forward(StreamTask.java:338) at
> >>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>
> >>>>>>>>>>>>>
> > at
> >>>>>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValue
> s$
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>
> >>>>>>>>>>>>>
> >>>>
> >>>>>>>>>>>>>
> >
> >>>>>>>>>>>>>
> KStreamMapProcessor.process(KStreamMapValues.java:42) at
> >>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode
> .pr
> >
> >>>>>>>>>>>>>
> oces
> >>>>
> >>>>>>>>>>>>>
> > s
> >>>>>>>>
> >>>>>>>>>>>>>
> >>>> (
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>> ProcessorNode.java:68)
> >>>>>>>>>>>>> at
> >>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> StreamTask.forward(StreamTask.java:338) at
> >>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>
> >>>>>>>>>>>>>
> > at org.apache.kafka.streams.processor.internals.
> >>>>>>>>>>>>> SourceNode.process(SourceNode.java:64) at
> >>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> StreamTask.process(StreamTask.java:174) at
> >>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.
> run
> >
> >>>>>>>>>>>>>
> Loop
> >>>>
> >>>>>>>>>>>>>
> > (
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>> StreamThread.java:320)
> >>>>>>>>>>>>> at
> >>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> StreamThread.run(StreamThread.java:218)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Looks like the timestamp passed to the
> >>>>>>>>>>>>> ProducerRecord is -1, though I am not
> >>>>>>>>>>>>> passing any timestamp explicitly. I am not
> >>>>>>>>>>>>> sure why this happens. But I see that the
> >>>>>>>>>>>>> Javadoc for ProducerRecord says the
> >>>>>>>>>>>>> following ..
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The record also has an associated
> >>>>>>>>>>>>> timestamp. If the user did not provide a
> >>>>>>>>>>>>>> timestamp, the producer will stamp the
> >>>>>>>>>>>>>> record with its current time. The
> >>>>>>>>>>>>>> timestamp eventually used by Kafka
> >>>>>>>>>>>>>> depends on the timestamp type configured
> >>>>>>>>>>>>>> for the topic. If the topic is configured
> >>>>>>>>>>>>>> to use CreateTime, the timestamp in the
> >>>>>>>>>>>>>> producer record will be used by the
> >>>>>>>>>>>>>> broker. If the topic is configured to
> >>>>>>>>>>>>>> use LogAppendTime, the timestamp in the
> >>>>>>>>>>>>>> producer record will be overwritten by
> >>>>>>>>>>>>>> the broker with the broker local time
> >>>>>>>>>>>>>> when it appends the message to its log.
> >>>>>>>>>>>>>> In either of the cases above, the
> >>>>>>>>>>>>>> timestamp that has actually been used
> >>>>>>>>>>>>>> will be returned to user in
> >>>>>>>>>>>>>> RecordMetadata
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1. Will this problem be solved if I
> >>>>>>>>>>>>> configure the topic with LogAppendTime or
> >>>>>>>>>>>>> CreateTime explicitly ? 2. What is the
> >>>>>>>>>>>>> default setting of this property in a newly
> >>>>>>>>>>>>> created topic ? 3. How do I change it (what
> >>>>>>>>>>>>> is the name of the property to be set) ?
> >>>>>>>>>>>>> 4. Any idea why I face this problem in the
> >>>>>>>>>>>>> cluster mode but not in the local mode ?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> BTW I am using 0.10.1.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Any help / pointer will be appreciated ?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> regards.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> -- Debasish Ghosh
> >>>>>>>>>>>>> http://manning.com/ghosh2
> >>>>>>>>>>>>> http://manning.com/ghosh
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Twttr: @debasishg Blog:
> >>>>>>>>>>>>> http://debasishg.blogspot.com Code:
> >>>>>>>>>>>>> http://github.com/debasishg
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> -- Debasish Ghosh http://manning.com/ghosh2
> >>>>> http://manning.com/ghosh
> >>>>>
> >>>>> Twttr: @debasishg Blog: http://debasishg.blogspot.com
> >>>>> Code: http://github.com/debasishg
> >>>>>
> >>>>
> >>>>
> >>>>
> >>
> >
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYFs69AAoJECnhiMLycopPLmkQAKGTE0GRxduQ7VHXCrBC2EJR
> ZiObYfPaI1dqkPz0m/rEgPAcUpBB855uzs9Oa77hngfT3xfwa05kDDrH+eEgsYgI
> YdqLt02inPd11UKztj+i8x1mQAqVB4IGzCQExmGgX9yDWpXpVgv+t+3gByVaoryK
> 18LxmbVHOeIiUEpsjyXeknlZUomOZ1fO30nSLCL+zmOjLjOnHicd5IYTOrRuBIGR
> J2bTvI0bxuDPJlIDKW7/nHC9OfInHcyO2B8k0LPK7IFgDlb/z+ICazbiIwjzkwrP
> fsuLla11N8+FkH3GNZOtP/gAgVJXmhVerZtAKn85xOGajQcs5o3MWJV3YCBKXtOq
> k+gqxP4GbWUIP/Zz+5QxD6bRugj0I86XiaMOrBfUjkXBtnDJCdWhDAP3WIZMYoFu
> ZbT33kakDMPRLOlxEiPkM2bPYe99SqzSh+v/Z2fiI8sqsGVgkPcwEkAWZupjy6b4
> /c2CHXK1wnhoRpOKpQUoKparZFp5v6cY56/vPjxU32soXyG0rx/0Jd8JHv7x2AFD
> 0xi7PvA6QvIF9m8RdMddrMjwQ5VW1yeYL20ZDf7mGQr+S96rHg7zqdYhwjkgWERx
> lBJQ8Q6sHF7XeRZvoShCxZ06BtD7LTokintAUjaV3+jX/pvxNGgBrQ7aD7Lyw4k/
> tnDi1J+ISJanK15EGPg6
> =gy51
> -----END PGP SIGNATURE-----
>


-- 
Sent from my iPhone

Re: Problem with timestamp in Producer

Posted by "Matthias J. Sax" <ma...@confluent.io>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Just one comment to add:

> it produces messages based
>> on 0.9.0 which obviously cannot be consumed by 0.10.0 brokers.

Strictly speaking, this is not correct. Brokers are compatible to
older versions of clients (not the other way round though). For 0.9
client writing to 0.10 brokers, it only happens that the client does
not set the timestamp field, but this is not an issue by itself. If
you consume a topic like this, this will also work.

The issue you observed is a special combination of different things. A
situation as describe about, plus Streams property. In Streams, input
record timestamps determine output record timestamps and the used 0.10
producer requires a valid TS value. Thus, a Streams application fails
for this case.

Btw: if you still want to use 0.9 producer client, you could also
change broker setting from "create time" to "append time". Using
append time, regardless of the producer behavior, a valid TS will be
set for all (newly) written records. Keep in mind, that this no global
but a per-topic broker setting.

- -Matthias


On 10/30/16 1:15 AM, Debasish Ghosh wrote:
> I think I found out what happened .. I was installing Kafka under
> DC/OS on AWS and following this doc
> https://dcos.io/docs/1.8/usage/tutorials/kafka/ for kafka
> installation. This works fine and installs kafka version 0.10.0.
> 
> But in the example where it shows how to produce and consume
> messages, this doc says the following ..
> 
> core@ip-10-0-6-153 ~ $ docker run -it mesosphere/kafka-client
>> root@7d0aed75e582:/bin# echo "Hello, World." | 
>> ./kafka-console-producer.sh --broker-list KAFKA_ADDRESS:PORT
>> --topic topic1
> 
> 
> The problem is the docker run of kafka-client pulls in version
> 0.9.0. When I use kafka-console-producer from this client, it
> produces messages based on 0.9.0 which obviously cannot be consumed
> by 0.10.0 brokers.
> 
> Thanks for pointing in this direction .. I think it will be fixed
> if I install a 0.10.0 client in its place.
> 
> regards.
> 
> On Sunday 30 October 2016, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
> The simplest way should be to check the java classpath.
> 
> Insert an
> 
> echo $CLASSPATH
> 
> at the end of bin/kafka-run-class.sh
> 
> Than run bin/kafka-console-producer.sh with no argument.
> 
> You should see the classpath be printed out. Look for 
> 'kafka-clients-XXX.jar' -- XXX will be the version number.
> 
> 
> -Matthias
> 
> 
> On 10/29/16 12:11 AM, Debasish Ghosh wrote:
>>>> Hello Mathias -
>>>> 
>>>> Regarding ..
>>>> 
>>>> In case you do have 0.10 brokers, it might however happen,
>>>> that bin/kafka-console-producer.sh
>>>>> does use 0.9 producer.
>>>> 
>>>> 
>>>> How can I check this ?
>>>> 
>>>> Thanks!
>>>> 
>>>> On Sat, Oct 29, 2016 at 12:23 PM, Debasish Ghosh 
>>>> <ghosh.debasish@gmail.com <javascript:;>> wrote:
>>>> 
>>>>> I agree .. the problem is DC/OS still ships the older
>>>>> version. Let me check if I can upgrade this ..
>>>>> 
>>>>> Thanks!
>>>>> 
>>>>> On Sat, Oct 29, 2016 at 12:21 PM, Matthias J. Sax 
>>>>> <matthias@confluent.io <javascript:;>> wrote:
>>>>> 
>>>> Btw: I would highly recommend to use Kafka 0.10.1 -- there
>>>> are many new Streams feature and usability improvements and
>>>> bug fixes.
>>>> 
>>>> -Matthias
>>>> 
>>>> On 10/28/16 11:42 PM, Matthias J. Sax wrote:
>>>>>>>> That sounds reasonable. However, I am wondering how
>>>>>>>> your Streams application can connect to 0.9 broker in
>>>>>>>> the first place. Streams internally uses standard
>>>>>>>> Kafka clients, and those are not backward compatible.
>>>>>>>> Thus, the 0.10 Streams clients should not be able to
>>>>>>>> connect to 0.9 broker.
>>>>>>>> 
>>>>>>>> In case you do have 0.10 brokers, it might however
>>>>>>>> happen, that bin/kafka-console-producer.sh does use
>>>>>>>> 0.9 producer. Broker are backward compatible, thus, a
>>>>>>>> 0.9 producer can write to 0.10 broker (and in this
>>>>>>>> case record TS would be invalid). While I assume that
>>>>>>>> in you local environment you are using 0.10
>>>>>>>> bin/kafka-console-produer.sh and thus all works
>>>>>>>> fine.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> -Matthias
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On 10/28/16 11:00 PM, Debasish Ghosh wrote:
>>>>>>>>> Hello Mathias -
>>>>>>>> 
>>>>>>>>> Thanks a lot for the response. I think what may be 
>>>>>>>>> happening is a version mismatch between the
>>>>>>>>> development & deployment versions of Kafka. The
>>>>>>>>> Kafka streams application that I developed uses
>>>>>>>>> 0.10.0 based libraries. And my local environment
>>>>>>>>> contains a server installation of the same version.
>>>>>>>>> Hence it works ok in my local environment.
>>>>>>>> 
>>>>>>>>> But the DC/OS 1.8 deploys 0.9.0 of Kafka when I
>>>>>>>>> install the service through DC/OS cli. And I use
>>>>>>>>> this version to load records into the input topic.
>>>>>>>>> And try to consume using the deployed streams
>>>>>>>>> application which I developed using 0.10.0. Hence
>>>>>>>>> the producer did not put the timestamp while the
>>>>>>>>> consumer expects to have one.
>>>>>>>> 
>>>>>>>>> I need to check if 0.10.x is available for DC/OS
>>>>>>>>> ..
>>>>>>>> 
>>>>>>>>> Thanks again for your suggestions.
>>>>>>>> 
>>>>>>>> 
>>>>>>>>> On Sat, Oct 29, 2016 at 3:23 AM, Matthias J. Sax 
>>>>>>>>> <matthias@confluent.io <javascript:;>> wrote:
>>>>>>>> 
>>>>>>>>> Hey,
>>>>>>>> 
>>>>>>>>> we just added a new FAQ entry for upcoming CP 3.2
>>>>>>>>> release that answers your question. I just c&p it
>>>>>>>>> here. More concrete answer below.
>>>>>>>> 
>>>>>>>>>>>> If you get an exception similar to the one
>>>>>>>>>>>> shown below, there are multiple possible
>>>>>>>>>>>> causes:
>>>>>>>>>>>> 
>>>>>>>>>>>> Exception in thread "StreamThread-1" 
>>>>>>>>>>>> java.lang.IllegalArgumentException: Invalid 
>>>>>>>>>>>> timestamp -1 at
>>>>>>>>>>>> 
>>>>>>>>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>(Pro
duc
>
>>>>>>>>>>>> 
erRe
>>>> 
>>>>>>>>>>>> 
> c
>>>>>>>> 
>>>>>>>>>>>> 
>>>> ord
>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>> .java:60)
>>>>>>>>>>>> 
>>>>>>>>>>>> This error means that the timestamp extractor
>>>>>>>>>>>> of your Kafka Streams application failed to
>>>>>>>>>>>> extract a valid timestamp from a record.
>>>>>>>>>>>> Typically, this points to a problem with the
>>>>>>>>>>>> record (e.g., the record does not contain a
>>>>>>>>>>>> timestamp at all), but it could also indicate
>>>>>>>>>>>> a problem or bug in the timestamp extractor
>>>>>>>>>>>> used by the application.
>>>>>>>>>>>> 
>>>>>>>>>>>> When does a record not contain a valid
>>>>>>>>>>>> timestamp:
>>>>>>>>>>>> 
>>>>>>>>>>>> If you are using the default 
>>>>>>>>>>>> ConsumerRecordTimestampExtractor, it is most
>>>>>>>>>>>> likely that your records do not carry an
>>>>>>>>>>>> embedded timestamp (embedded record
>>>>>>>>>>>> timestamps got introduced in Kafka's message
>>>>>>>>>>>> format in Kafka 0.10). This might happen, if
>>>>>>>>>>>> you consume a topic that is written by old
>>>>>>>>>>>> Kafka producer clients (ie, version 0.9 or
>>>>>>>>>>>> earlier) or third party producer clients. A
>>>>>>>>>>>> common situation where this may happen is
>>>>>>>>>>>> after upgrading your Kafka cluster from 0.9
>>>>>>>>>>>> to 0.10, where all the data that was
>>>>>>>>>>>> generated with 0.9 is not compatible with the
>>>>>>>>>>>> 0.10 message format. If you are using a
>>>>>>>>>>>> custom timestamp extractor, make sure that
>>>>>>>>>>>> your extractor is robust to missing 
>>>>>>>>>>>> timestamps in your records. For example, you
>>>>>>>>>>>> can return a default or estimated timestamp
>>>>>>>>>>>> if you cannot extract a valid timestamp
>>>>>>>>>>>> (maybe the timstamp field in your data is
>>>>>>>>>>>> just missing). You can also switch to
>>>>>>>>>>>> processing time semantics via 
>>>>>>>>>>>> WallclockTimestampExtractor; whether such a 
>>>>>>>>>>>> fallback is an appropriate response to this 
>>>>>>>>>>>> situation depends on your use case. However,
>>>>>>>>>>>> as a first step you should identify and fix
>>>>>>>>>>>> the root cause for why such problematic
>>>>>>>>>>>> records were written to Kafka in the first
>>>>>>>>>>>> place. In a second step you may consider
>>>>>>>>>>>> applying workarounds (as described above)
>>>>>>>>>>>> when dealing with such records (for example, 
>>>>>>>>>>>> if you need to process those records after
>>>>>>>>>>>> all). Another option is to regenerate the
>>>>>>>>>>>> records with correct timestamps and write
>>>>>>>>>>>> them to a new Kafka topic.
>>>>>>>>>>>> 
>>>>>>>>>>>> When the timestamp extractor causes the
>>>>>>>>>>>> problem:
>>>>>>>>>>>> 
>>>>>>>>>>>> In this situation you should debug and fix
>>>>>>>>>>>> the erroneous extractor. If the extractor is
>>>>>>>>>>>> built into Kafka, please report the bug to
>>>>>>>>>>>> the Kafka developer mailing list at
>>>>>>>>>>>> dev@kafka.apache.org <javascript:;> (see 
>>>>>>>>>>>> instructions
>>>>>>>>>>>> http://kafka.apache.org/contact); in the
>>>>>>>>>>>> meantime, you may write a custom timestamp 
>>>>>>>>>>>> extractor that fixes the problem and
>>>>>>>>>>>> configure your application to use that
>>>>>>>>>>>> extractor for the time being.
>>>>>>>> 
>>>>>>>> 
>>>>>>>>> To address you questions more concretely:
>>>>>>>> 
>>>>>>>>> 1. Yes an no: Yes, for any new data you write to
>>>>>>>>> you topic. No, for any already written data that
>>>>>>>>> does not have a valid timestamp set 2. Default is
>>>>>>>>> creating time 3. Config parameter
>>>>>>>>> "message.timestamp.type") It's a broker side per
>>>>>>>>> topic setting (however, be aware that Java 
>>>>>>>>> KafkaProducer does verify the timestamp locally
>>>>>>>>> before sending the message to the broker, thus on
>>>>>>>>> -1 there will be the client side exception you did
>>>>>>>>> observe( 4. I assume that you do consumer different
>>>>>>>>> topic with different TS fields in you records.
>>>>>>>> 
>>>>>>>>> Also have a look at: 
>>>>>>>>> http://docs.confluent.io/current/streams/concepts.html#time
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>
>>>>>>>>> 
- -Matthias
>>>>>>>> 
>>>>>>>> 
>>>>>>>>> On 10/28/16 5:42 AM, Debasish Ghosh wrote:
>>>>>>>>>>>> I am actually using 0.10.0 and NOT 0.10.1 as
>>>>>>>>>>>> I mentioned in the last mail. And I am using
>>>>>>>>>>>> Kafka within a DC/OS cluster under AWS.
>>>>>>>>>>>> 
>>>>>>>>>>>> The version that I mentioned works ok is on
>>>>>>>>>>>> my local machine using a local Kafka
>>>>>>>>>>>> installation. And it works for both single
>>>>>>>>>>>> broker and multi broker scenario.
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks.
>>>>>>>>>>>> 
>>>>>>>>>>>> On Fri, Oct 28, 2016 at 1:37 PM, Debasish
>>>>>>>>>>>> Ghosh <ghosh.debasish@gmail.com
>>>>>>>>>>>> <javascript:;>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hello -
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I am a beginner in Kafka .. with my first
>>>>>>>>>>>>> Kafka streams application ..
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I have a streams application that reads
>>>>>>>>>>>>> from a topic, does some transformation on
>>>>>>>>>>>>> the data and writes to another topic. The
>>>>>>>>>>>>> record that I manipulate is a CSV record.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> It runs fine when I run it on a local
>>>>>>>>>>>>> Kafka instance.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> However when I run it on an AWS cluster, I
>>>>>>>>>>>>> get the following exception when I try to
>>>>>>>>>>>>> produce the transformed record into the
>>>>>>>>>>>>> target topic.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Exception in thread "StreamThread-1" 
>>>>>>>>>>>>> java.lang.IllegalArgumentException:
>>>>>>>>>>>>> Invalid timestamp -1 at 
>>>>>>>>>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>
>>>>>>>>>>>>> 
(ProducerRecord.java:60) at
>>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.SinkNode.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>
>>>>>>>>>>>>> 
process(SinkNode.java:72) at
>>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 
StreamTask.forward(StreamTask.java:338) at
>>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 
ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>> 
>>>>>>>>>>>>> 
> at
>>>>>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValue
s$
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>
>>>>>>>>>>>>>
>>>>
>>>>>>>>>>>>>
>
>>>>>>>>>>>>> 
KStreamMapProcessor.process(KStreamMapValues.java:42) at
>>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode
.pr
>
>>>>>>>>>>>>> 
oces
>>>> 
>>>>>>>>>>>>> 
> s
>>>>>>>> 
>>>>>>>>>>>>> 
>>>> (
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>> ProcessorNode.java:68)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 
StreamTask.forward(StreamTask.java:338) at
>>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 
ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>> 
>>>>>>>>>>>>> 
> at
>>>>>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamPassThro
ugh
>
>>>>>>>>>>>>> 
$
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>>>> 
>>>> 
>>>>>>>>>>>>> 
> KStreamPassThroughProcessor.process(KStreamPassThrough.java:34)
>>>>>>>>>>>>> at 
>>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode
.pr
>
>>>>>>>>>>>>> 
oces
>>>> 
>>>>>>>>>>>>> 
> s
>>>>>>>> 
>>>>>>>>>>>>> 
>>>> (
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>> ProcessorNode.java:68)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 
StreamTask.forward(StreamTask.java:351) at
>>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 
ProcessorContextImpl.forward(ProcessorContextImpl.java:192)
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>> 
>>>>>>>>>>>>> 
> at
>>>>>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamBranch$
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>
>>>>>>>>>>>>> 
KStreamBranchProcessor.process(KStreamBranch.java:46) at
>>>>>>>>>>>>> 
>>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode
.pr
>
>>>>>>>>>>>>> 
oces
>>>> 
>>>>>>>>>>>>> 
> s
>>>>>>>> 
>>>>>>>>>>>>> 
>>>> (
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>> ProcessorNode.java:68)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 
StreamTask.forward(StreamTask.java:338) at
>>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 
ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>> 
>>>>>>>>>>>>> 
> at
>>>>>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValue
s$
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>
>>>>>>>>>>>>>
>>>>
>>>>>>>>>>>>>
>
>>>>>>>>>>>>> 
KStreamMapProcessor.process(KStreamMapValues.java:42) at
>>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode
.pr
>
>>>>>>>>>>>>> 
oces
>>>> 
>>>>>>>>>>>>> 
> s
>>>>>>>> 
>>>>>>>>>>>>> 
>>>> (
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>> ProcessorNode.java:68)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 
StreamTask.forward(StreamTask.java:338) at
>>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 
ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>> 
>>>>>>>>>>>>> 
> at org.apache.kafka.streams.processor.internals.
>>>>>>>>>>>>> SourceNode.process(SourceNode.java:64) at 
>>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 
StreamTask.process(StreamTask.java:174) at
>>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.
run
>
>>>>>>>>>>>>> 
Loop
>>>> 
>>>>>>>>>>>>> 
> (
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>> StreamThread.java:320)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 
StreamThread.run(StreamThread.java:218)
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Looks like the timestamp passed to the 
>>>>>>>>>>>>> ProducerRecord is -1, though I am not
>>>>>>>>>>>>> passing any timestamp explicitly. I am not
>>>>>>>>>>>>> sure why this happens. But I see that the
>>>>>>>>>>>>> Javadoc for ProducerRecord says the
>>>>>>>>>>>>> following ..
>>>>>>>>>>>>> 
>>>>>>>>>>>>> The record also has an associated
>>>>>>>>>>>>> timestamp. If the user did not provide a
>>>>>>>>>>>>>> timestamp, the producer will stamp the
>>>>>>>>>>>>>> record with its current time. The
>>>>>>>>>>>>>> timestamp eventually used by Kafka
>>>>>>>>>>>>>> depends on the timestamp type configured
>>>>>>>>>>>>>> for the topic. If the topic is configured
>>>>>>>>>>>>>> to use CreateTime, the timestamp in the
>>>>>>>>>>>>>> producer record will be used by the
>>>>>>>>>>>>>> broker. If the topic is configured to
>>>>>>>>>>>>>> use LogAppendTime, the timestamp in the
>>>>>>>>>>>>>> producer record will be overwritten by
>>>>>>>>>>>>>> the broker with the broker local time
>>>>>>>>>>>>>> when it appends the message to its log.
>>>>>>>>>>>>>> In either of the cases above, the
>>>>>>>>>>>>>> timestamp that has actually been used
>>>>>>>>>>>>>> will be returned to user in 
>>>>>>>>>>>>>> RecordMetadata
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 1. Will this problem be solved if I
>>>>>>>>>>>>> configure the topic with LogAppendTime or
>>>>>>>>>>>>> CreateTime explicitly ? 2. What is the
>>>>>>>>>>>>> default setting of this property in a newly
>>>>>>>>>>>>> created topic ? 3. How do I change it (what
>>>>>>>>>>>>> is the name of the property to be set) ?
>>>>>>>>>>>>> 4. Any idea why I face this problem in the
>>>>>>>>>>>>> cluster mode but not in the local mode ?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> BTW I am using 0.10.1.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Any help / pointer will be appreciated ?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> regards.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> -- Debasish Ghosh
>>>>>>>>>>>>> http://manning.com/ghosh2 
>>>>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Twttr: @debasishg Blog: 
>>>>>>>>>>>>> http://debasishg.blogspot.com Code: 
>>>>>>>>>>>>> http://github.com/debasishg
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> -- Debasish Ghosh http://manning.com/ghosh2 
>>>>> http://manning.com/ghosh
>>>>> 
>>>>> Twttr: @debasishg Blog: http://debasishg.blogspot.com
>>>>> Code: http://github.com/debasishg
>>>>> 
>>>> 
>>>> 
>>>> 
>> 
> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYFs69AAoJECnhiMLycopPLmkQAKGTE0GRxduQ7VHXCrBC2EJR
ZiObYfPaI1dqkPz0m/rEgPAcUpBB855uzs9Oa77hngfT3xfwa05kDDrH+eEgsYgI
YdqLt02inPd11UKztj+i8x1mQAqVB4IGzCQExmGgX9yDWpXpVgv+t+3gByVaoryK
18LxmbVHOeIiUEpsjyXeknlZUomOZ1fO30nSLCL+zmOjLjOnHicd5IYTOrRuBIGR
J2bTvI0bxuDPJlIDKW7/nHC9OfInHcyO2B8k0LPK7IFgDlb/z+ICazbiIwjzkwrP
fsuLla11N8+FkH3GNZOtP/gAgVJXmhVerZtAKn85xOGajQcs5o3MWJV3YCBKXtOq
k+gqxP4GbWUIP/Zz+5QxD6bRugj0I86XiaMOrBfUjkXBtnDJCdWhDAP3WIZMYoFu
ZbT33kakDMPRLOlxEiPkM2bPYe99SqzSh+v/Z2fiI8sqsGVgkPcwEkAWZupjy6b4
/c2CHXK1wnhoRpOKpQUoKparZFp5v6cY56/vPjxU32soXyG0rx/0Jd8JHv7x2AFD
0xi7PvA6QvIF9m8RdMddrMjwQ5VW1yeYL20ZDf7mGQr+S96rHg7zqdYhwjkgWERx
lBJQ8Q6sHF7XeRZvoShCxZ06BtD7LTokintAUjaV3+jX/pvxNGgBrQ7aD7Lyw4k/
tnDi1J+ISJanK15EGPg6
=gy51
-----END PGP SIGNATURE-----

Re: Problem with timestamp in Producer

Posted by Debasish Ghosh <gh...@gmail.com>.
I think I found out what happened .. I was installing Kafka under DC/OS on
AWS and following this doc https://dcos.io/docs/1.8/usage/tutorials/kafka/ for
kafka installation. This works fine and installs kafka version 0.10.0.

But in the example where it shows how to produce and consume messages, this
doc says the following ..

core@ip-10-0-6-153 ~ $ docker run -it mesosphere/kafka-client
> root@7d0aed75e582:/bin# echo "Hello, World." |
> ./kafka-console-producer.sh --broker-list KAFKA_ADDRESS:PORT --topic topic1


The problem is the docker run of kafka-client pulls in version 0.9.0. When
I use kafka-console-producer from this client, it produces messages based
on 0.9.0 which obviously cannot be consumed by 0.10.0 brokers.

Thanks for pointing in this direction .. I think it will be fixed if I
install a 0.10.0 client in its place.

regards.

On Sunday 30 October 2016, Matthias J. Sax <ma...@confluent.io> wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> The simplest way should be to check the java classpath.
>
> Insert an
>
> echo $CLASSPATH
>
> at the end of bin/kafka-run-class.sh
>
> Than run bin/kafka-console-producer.sh with no argument.
>
> You should see the classpath be printed out. Look for
> 'kafka-clients-XXX.jar' -- XXX will be the version number.
>
>
> - -Matthias
>
>
> On 10/29/16 12:11 AM, Debasish Ghosh wrote:
> > Hello Mathias -
> >
> > Regarding ..
> >
> > In case you do have 0.10 brokers, it might however happen, that
> > bin/kafka-console-producer.sh
> >> does use 0.9 producer.
> >
> >
> > How can I check this ?
> >
> > Thanks!
> >
> > On Sat, Oct 29, 2016 at 12:23 PM, Debasish Ghosh
> > <ghosh.debasish@gmail.com <javascript:;>> wrote:
> >
> >> I agree .. the problem is DC/OS still ships the older version.
> >> Let me check if I can upgrade this ..
> >>
> >> Thanks!
> >>
> >> On Sat, Oct 29, 2016 at 12:21 PM, Matthias J. Sax
> >> <matthias@confluent.io <javascript:;>> wrote:
> >>
> > Btw: I would highly recommend to use Kafka 0.10.1 -- there are
> > many new Streams feature and usability improvements and bug fixes.
> >
> > -Matthias
> >
> > On 10/28/16 11:42 PM, Matthias J. Sax wrote:
> >>>>> That sounds reasonable. However, I am wondering how your
> >>>>> Streams application can connect to 0.9 broker in the first
> >>>>> place. Streams internally uses standard Kafka clients, and
> >>>>> those are not backward compatible. Thus, the 0.10 Streams
> >>>>> clients should not be able to connect to 0.9 broker.
> >>>>>
> >>>>> In case you do have 0.10 brokers, it might however happen,
> >>>>> that bin/kafka-console-producer.sh does use 0.9 producer.
> >>>>> Broker are backward compatible, thus, a 0.9 producer can
> >>>>> write to 0.10 broker (and in this case record TS would be
> >>>>> invalid). While I assume that in you local environment you
> >>>>> are using 0.10 bin/kafka-console-produer.sh and thus all
> >>>>> works fine.
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>>
> >>>>> On 10/28/16 11:00 PM, Debasish Ghosh wrote:
> >>>>>> Hello Mathias -
> >>>>>
> >>>>>> Thanks a lot for the response. I think what may be
> >>>>>> happening is a version mismatch between the development &
> >>>>>> deployment versions of Kafka. The Kafka streams
> >>>>>> application that I developed uses 0.10.0 based libraries.
> >>>>>> And my local environment contains a server installation
> >>>>>> of the same version. Hence it works ok in my local
> >>>>>> environment.
> >>>>>
> >>>>>> But the DC/OS 1.8 deploys 0.9.0 of Kafka when I install
> >>>>>> the service through DC/OS cli. And I use this version to
> >>>>>> load records into the input topic. And try to consume
> >>>>>> using the deployed streams application which I developed
> >>>>>> using 0.10.0. Hence the producer did not put the
> >>>>>> timestamp while the consumer expects to have one.
> >>>>>
> >>>>>> I need to check if 0.10.x is available for DC/OS ..
> >>>>>
> >>>>>> Thanks again for your suggestions.
> >>>>>
> >>>>>
> >>>>>> On Sat, Oct 29, 2016 at 3:23 AM, Matthias J. Sax
> >>>>>> <matthias@confluent.io <javascript:;>> wrote:
> >>>>>
> >>>>>> Hey,
> >>>>>
> >>>>>> we just added a new FAQ entry for upcoming CP 3.2 release
> >>>>>> that answers your question. I just c&p it here. More
> >>>>>> concrete answer below.
> >>>>>
> >>>>>>>>> If you get an exception similar to the one shown
> >>>>>>>>> below, there are multiple possible causes:
> >>>>>>>>>
> >>>>>>>>> Exception in thread "StreamThread-1"
> >>>>>>>>> java.lang.IllegalArgumentException: Invalid
> >>>>>>>>> timestamp -1 at
> >>>>>>>>>
> >>>>>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>(Produc
> erRe
> >
> >>>>>>>>>
> c
> >>>>>
> >>>>>>>>>
> > ord
> >>>>>
> >>>>>>>>>
> >>>>> .java:60)
> >>>>>>>>>
> >>>>>>>>> This error means that the timestamp extractor of
> >>>>>>>>> your Kafka Streams application failed to extract a
> >>>>>>>>> valid timestamp from a record. Typically, this
> >>>>>>>>> points to a problem with the record (e.g., the
> >>>>>>>>> record does not contain a timestamp at all), but it
> >>>>>>>>> could also indicate a problem or bug in the
> >>>>>>>>> timestamp extractor used by the application.
> >>>>>>>>>
> >>>>>>>>> When does a record not contain a valid timestamp:
> >>>>>>>>>
> >>>>>>>>> If you are using the default
> >>>>>>>>> ConsumerRecordTimestampExtractor, it is most likely
> >>>>>>>>> that your records do not carry an embedded
> >>>>>>>>> timestamp (embedded record timestamps got
> >>>>>>>>> introduced in Kafka's message format in Kafka
> >>>>>>>>> 0.10). This might happen, if you consume a topic
> >>>>>>>>> that is written by old Kafka producer clients (ie,
> >>>>>>>>> version 0.9 or earlier) or third party producer
> >>>>>>>>> clients. A common situation where this may happen
> >>>>>>>>> is after upgrading your Kafka cluster from 0.9 to
> >>>>>>>>> 0.10, where all the data that was generated with
> >>>>>>>>> 0.9 is not compatible with the 0.10 message format.
> >>>>>>>>> If you are using a custom timestamp extractor,
> >>>>>>>>> make sure that your extractor is robust to missing
> >>>>>>>>> timestamps in your records. For example, you can
> >>>>>>>>> return a default or estimated timestamp if you
> >>>>>>>>> cannot extract a valid timestamp (maybe the
> >>>>>>>>> timstamp field in your data is just missing). You
> >>>>>>>>> can also switch to processing time semantics via
> >>>>>>>>> WallclockTimestampExtractor; whether such a
> >>>>>>>>> fallback is an appropriate response to this
> >>>>>>>>> situation depends on your use case. However, as a
> >>>>>>>>> first step you should identify and fix the root
> >>>>>>>>> cause for why such problematic records were written
> >>>>>>>>> to Kafka in the first place. In a second step you
> >>>>>>>>> may consider applying workarounds (as described
> >>>>>>>>> above) when dealing with such records (for example,
> >>>>>>>>> if you need to process those records after all).
> >>>>>>>>> Another option is to regenerate the records with
> >>>>>>>>> correct timestamps and write them to a new Kafka
> >>>>>>>>> topic.
> >>>>>>>>>
> >>>>>>>>> When the timestamp extractor causes the problem:
> >>>>>>>>>
> >>>>>>>>> In this situation you should debug and fix the
> >>>>>>>>> erroneous extractor. If the extractor is built into
> >>>>>>>>> Kafka, please report the bug to the Kafka developer
> >>>>>>>>> mailing list at dev@kafka.apache.org <javascript:;> (see
> >>>>>>>>> instructions http://kafka.apache.org/contact); in
> >>>>>>>>> the meantime, you may write a custom timestamp
> >>>>>>>>> extractor that fixes the problem and configure your
> >>>>>>>>> application to use that extractor for the time
> >>>>>>>>> being.
> >>>>>
> >>>>>
> >>>>>> To address you questions more concretely:
> >>>>>
> >>>>>> 1. Yes an no: Yes, for any new data you write to you
> >>>>>> topic. No, for any already written data that does not
> >>>>>> have a valid timestamp set 2. Default is creating time 3.
> >>>>>> Config parameter "message.timestamp.type") It's a broker
> >>>>>> side per topic setting (however, be aware that Java
> >>>>>> KafkaProducer does verify the timestamp locally before
> >>>>>> sending the message to the broker, thus on -1 there will
> >>>>>> be the client side exception you did observe( 4. I assume
> >>>>>> that you do consumer different topic with different TS
> >>>>>> fields in you records.
> >>>>>
> >>>>>> Also have a look at:
> >>>>>> http://docs.confluent.io/current/streams/concepts.html#time
> >>>>>
> >>>>>
> >>>>>
> >>>>>>
> >>>>>>
> - -Matthias
> >>>>>
> >>>>>
> >>>>>> On 10/28/16 5:42 AM, Debasish Ghosh wrote:
> >>>>>>>>> I am actually using 0.10.0 and NOT 0.10.1 as I
> >>>>>>>>> mentioned in the last mail. And I am using Kafka
> >>>>>>>>> within a DC/OS cluster under AWS.
> >>>>>>>>>
> >>>>>>>>> The version that I mentioned works ok is on my
> >>>>>>>>> local machine using a local Kafka installation. And
> >>>>>>>>> it works for both single broker and multi broker
> >>>>>>>>> scenario.
> >>>>>>>>>
> >>>>>>>>> Thanks.
> >>>>>>>>>
> >>>>>>>>> On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh
> >>>>>>>>> <ghosh.debasish@gmail.com <javascript:;>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hello -
> >>>>>>>>>>
> >>>>>>>>>> I am a beginner in Kafka .. with my first Kafka
> >>>>>>>>>> streams application ..
> >>>>>>>>>>
> >>>>>>>>>> I have a streams application that reads from a
> >>>>>>>>>> topic, does some transformation on the data and
> >>>>>>>>>> writes to another topic. The record that I
> >>>>>>>>>> manipulate is a CSV record.
> >>>>>>>>>>
> >>>>>>>>>> It runs fine when I run it on a local Kafka
> >>>>>>>>>> instance.
> >>>>>>>>>>
> >>>>>>>>>> However when I run it on an AWS cluster, I get
> >>>>>>>>>> the following exception when I try to produce
> >>>>>>>>>> the transformed record into the target topic.
> >>>>>>>>>>
> >>>>>>>>>> Exception in thread "StreamThread-1"
> >>>>>>>>>> java.lang.IllegalArgumentException: Invalid
> >>>>>>>>>> timestamp -1 at
> >>>>>>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>
> >>>>>>>>>>
> >>>>>>>>>>
> (ProducerRecord.java:60) at
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.SinkNode.
> >>>>>>>>>>
> >>>>>>>>>>
> process(SinkNode.java:72) at
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> StreamTask.forward(StreamTask.java:338) at
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>>>>>>>
> >>>>>>>>>>
> >
> >>>>>>>>>>
> at
> >>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>
> >>>>>>>>>>
> >
> >>>>>>>>>>
> KStreamMapProcessor.process(KStreamMapValues.java:42) at
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pr
> oces
> >
> >>>>>>>>>>
> s
> >>>>>
> >>>>>>>>>>
> > (
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>
> >>>>>>>>>>
> >>>>> ProcessorNode.java:68)
> >>>>>>>>>> at org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> StreamTask.forward(StreamTask.java:338) at
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>>>>>>>
> >>>>>>>>>>
> >
> >>>>>>>>>>
> at
> >>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamPassThrough
> $
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>
> >>>>>>>>>>
> >
> >>>>>>>>>>
> KStreamPassThroughProcessor.process(KStreamPassThrough.java:34)
> >>>>>>>>>> at
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pr
> oces
> >
> >>>>>>>>>>
> s
> >>>>>
> >>>>>>>>>>
> > (
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>
> >>>>>>>>>>
> >>>>> ProcessorNode.java:68)
> >>>>>>>>>> at org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> StreamTask.forward(StreamTask.java:351) at
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:192)
> >>>>>>>>>>
> >>>>>>>>>>
> >
> >>>>>>>>>>
> at
> >>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamBranch$
> >>>>>>>>>>
> >>>>>>>>>>
> KStreamBranchProcessor.process(KStreamBranch.java:46) at
> >>>>>>>>>>
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pr
> oces
> >
> >>>>>>>>>>
> s
> >>>>>
> >>>>>>>>>>
> > (
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>
> >>>>>>>>>>
> >>>>> ProcessorNode.java:68)
> >>>>>>>>>> at org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> StreamTask.forward(StreamTask.java:338) at
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>>>>>>>
> >>>>>>>>>>
> >
> >>>>>>>>>>
> at
> >>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>
> >>>>>>>>>>
> >
> >>>>>>>>>>
> KStreamMapProcessor.process(KStreamMapValues.java:42) at
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pr
> oces
> >
> >>>>>>>>>>
> s
> >>>>>
> >>>>>>>>>>
> > (
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>
> >>>>>>>>>>
> >>>>> ProcessorNode.java:68)
> >>>>>>>>>> at org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> StreamTask.forward(StreamTask.java:338) at
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>>>>>>>
> >>>>>>>>>>
> >
> >>>>>>>>>>
> at org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> SourceNode.process(SourceNode.java:64) at
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> StreamTask.process(StreamTask.java:174) at
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.run
> Loop
> >
> >>>>>>>>>>
> (
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>> StreamThread.java:320)
> >>>>>>>>>> at org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> StreamThread.run(StreamThread.java:218)
> >>>>>>>>>>
> >>>>>>>>>> Looks like the timestamp passed to the
> >>>>>>>>>> ProducerRecord is -1, though I am not passing any
> >>>>>>>>>> timestamp explicitly. I am not sure why this
> >>>>>>>>>> happens. But I see that the Javadoc for
> >>>>>>>>>> ProducerRecord says the following ..
> >>>>>>>>>>
> >>>>>>>>>> The record also has an associated timestamp. If
> >>>>>>>>>> the user did not provide a
> >>>>>>>>>>> timestamp, the producer will stamp the record
> >>>>>>>>>>> with its current time. The timestamp eventually
> >>>>>>>>>>> used by Kafka depends on the timestamp type
> >>>>>>>>>>> configured for the topic. If the topic is
> >>>>>>>>>>> configured to use CreateTime, the timestamp in
> >>>>>>>>>>> the producer record will be used by the broker.
> >>>>>>>>>>> If the topic is configured to use
> >>>>>>>>>>> LogAppendTime, the timestamp in the producer
> >>>>>>>>>>> record will be overwritten by the broker with
> >>>>>>>>>>> the broker local time when it appends the
> >>>>>>>>>>> message to its log. In either of the cases
> >>>>>>>>>>> above, the timestamp that has actually been
> >>>>>>>>>>> used will be returned to user in
> >>>>>>>>>>> RecordMetadata
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 1. Will this problem be solved if I configure the
> >>>>>>>>>> topic with LogAppendTime or CreateTime explicitly
> >>>>>>>>>> ? 2. What is the default setting of this property
> >>>>>>>>>> in a newly created topic ? 3. How do I change it
> >>>>>>>>>> (what is the name of the property to be set) ? 4.
> >>>>>>>>>> Any idea why I face this problem in the cluster
> >>>>>>>>>> mode but not in the local mode ?
> >>>>>>>>>>
> >>>>>>>>>> BTW I am using 0.10.1.
> >>>>>>>>>>
> >>>>>>>>>> Any help / pointer will be appreciated ?
> >>>>>>>>>>
> >>>>>>>>>> regards.
> >>>>>>>>>>
> >>>>>>>>>> -- Debasish Ghosh http://manning.com/ghosh2
> >>>>>>>>>> http://manning.com/ghosh
> >>>>>>>>>>
> >>>>>>>>>> Twttr: @debasishg Blog:
> >>>>>>>>>> http://debasishg.blogspot.com Code:
> >>>>>>>>>> http://github.com/debasishg
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>
> >>
> >>
> >>
> >> -- Debasish Ghosh http://manning.com/ghosh2
> >> http://manning.com/ghosh
> >>
> >> Twttr: @debasishg Blog: http://debasishg.blogspot.com Code:
> >> http://github.com/debasishg
> >>
> >
> >
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYFaONAAoJECnhiMLycopPH/4P+gOUVW4Ab6UBlrRiap1snDkC
> 1xNaWIzlyF2i8nZ+FUTmXrvJiF1TSSw8f+apyZpjf+Q8uMS1Bv6ZzqqiHVC0+gFb
> Ymis+pOHhEl3je5uJWf41emrUxvJHalDlrLqLCk0cxlTYgBCgoAxLtzbvFrejw0e
> uYcfjz+mERK4upNZS3KbO8tMMpr+M163u02dUAhT6kuaJNfSICNKKEVIK9WrCAMN
> skhrVhcM6XRh6YisU73erg2grcGAMTxYf53eWt8saRqICMlcmxDbTHSxJ6Qog4l4
> c6OSxxtnB+rXYSDEtoWH3CSxkPK0zlLUDcKXqz4bUNrWgoaCjAfz2WuANNskUi3L
> dZlfO+vvbS2NLjqNFzqVUjV5tnbdCL7MTO4ByQZ7Jh9TQeOyMkHW8+fGsCZUJ3Ex
> SIx95MYJyOk1n390yFjjeJEQT/yHIq7nXXgxjJ6dBPfEIB6VwXOHGvaucXMus2XF
> ioBr/CuoNhWTfHn19TNSkSObJP61W2YCA1xHlSzVutHoISKHZKRq0z6AbbLdD7Z3
> weLV5zHrnmibeKDYc+OX+60Kr+YsgSqeNwDx+EVFjkMTagUydzg06PcWigSLl6ZC
> 2rxs2Esb0W+Q4Iy+IYCzfQuQGQz5oSiUB32hxTio6dDz8otBY6+U3QrBz7mOtc+v
> OeP4OeKsbMBBZGJNa0Ux
> =NjV2
> -----END PGP SIGNATURE-----
>


-- 
Sent from my iPhone

Re: Problem with timestamp in Producer

Posted by "Matthias J. Sax" <ma...@confluent.io>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

The simplest way should be to check the java classpath.

Insert an

echo $CLASSPATH

at the end of bin/kafka-run-class.sh

Than run bin/kafka-console-producer.sh with no argument.

You should see the classpath be printed out. Look for
'kafka-clients-XXX.jar' -- XXX will be the version number.


- -Matthias


On 10/29/16 12:11 AM, Debasish Ghosh wrote:
> Hello Mathias -
> 
> Regarding ..
> 
> In case you do have 0.10 brokers, it might however happen, that 
> bin/kafka-console-producer.sh
>> does use 0.9 producer.
> 
> 
> How can I check this ?
> 
> Thanks!
> 
> On Sat, Oct 29, 2016 at 12:23 PM, Debasish Ghosh
> <gh...@gmail.com> wrote:
> 
>> I agree .. the problem is DC/OS still ships the older version.
>> Let me check if I can upgrade this ..
>> 
>> Thanks!
>> 
>> On Sat, Oct 29, 2016 at 12:21 PM, Matthias J. Sax
>> <ma...@confluent.io> wrote:
>> 
> Btw: I would highly recommend to use Kafka 0.10.1 -- there are
> many new Streams feature and usability improvements and bug fixes.
> 
> -Matthias
> 
> On 10/28/16 11:42 PM, Matthias J. Sax wrote:
>>>>> That sounds reasonable. However, I am wondering how your
>>>>> Streams application can connect to 0.9 broker in the first
>>>>> place. Streams internally uses standard Kafka clients, and
>>>>> those are not backward compatible. Thus, the 0.10 Streams
>>>>> clients should not be able to connect to 0.9 broker.
>>>>> 
>>>>> In case you do have 0.10 brokers, it might however happen,
>>>>> that bin/kafka-console-producer.sh does use 0.9 producer.
>>>>> Broker are backward compatible, thus, a 0.9 producer can
>>>>> write to 0.10 broker (and in this case record TS would be
>>>>> invalid). While I assume that in you local environment you
>>>>> are using 0.10 bin/kafka-console-produer.sh and thus all
>>>>> works fine.
>>>>> 
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> 
>>>>> On 10/28/16 11:00 PM, Debasish Ghosh wrote:
>>>>>> Hello Mathias -
>>>>> 
>>>>>> Thanks a lot for the response. I think what may be
>>>>>> happening is a version mismatch between the development &
>>>>>> deployment versions of Kafka. The Kafka streams
>>>>>> application that I developed uses 0.10.0 based libraries.
>>>>>> And my local environment contains a server installation
>>>>>> of the same version. Hence it works ok in my local
>>>>>> environment.
>>>>> 
>>>>>> But the DC/OS 1.8 deploys 0.9.0 of Kafka when I install
>>>>>> the service through DC/OS cli. And I use this version to
>>>>>> load records into the input topic. And try to consume
>>>>>> using the deployed streams application which I developed
>>>>>> using 0.10.0. Hence the producer did not put the
>>>>>> timestamp while the consumer expects to have one.
>>>>> 
>>>>>> I need to check if 0.10.x is available for DC/OS ..
>>>>> 
>>>>>> Thanks again for your suggestions.
>>>>> 
>>>>> 
>>>>>> On Sat, Oct 29, 2016 at 3:23 AM, Matthias J. Sax 
>>>>>> <ma...@confluent.io> wrote:
>>>>> 
>>>>>> Hey,
>>>>> 
>>>>>> we just added a new FAQ entry for upcoming CP 3.2 release
>>>>>> that answers your question. I just c&p it here. More
>>>>>> concrete answer below.
>>>>> 
>>>>>>>>> If you get an exception similar to the one shown
>>>>>>>>> below, there are multiple possible causes:
>>>>>>>>> 
>>>>>>>>> Exception in thread "StreamThread-1" 
>>>>>>>>> java.lang.IllegalArgumentException: Invalid
>>>>>>>>> timestamp -1 at
>>>>>>>>> 
>>>>>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>(Produc
erRe
>
>>>>>>>>> 
c
>>>>> 
>>>>>>>>> 
> ord
>>>>> 
>>>>>>>>> 
>>>>> .java:60)
>>>>>>>>> 
>>>>>>>>> This error means that the timestamp extractor of
>>>>>>>>> your Kafka Streams application failed to extract a
>>>>>>>>> valid timestamp from a record. Typically, this
>>>>>>>>> points to a problem with the record (e.g., the
>>>>>>>>> record does not contain a timestamp at all), but it
>>>>>>>>> could also indicate a problem or bug in the 
>>>>>>>>> timestamp extractor used by the application.
>>>>>>>>> 
>>>>>>>>> When does a record not contain a valid timestamp:
>>>>>>>>> 
>>>>>>>>> If you are using the default 
>>>>>>>>> ConsumerRecordTimestampExtractor, it is most likely
>>>>>>>>> that your records do not carry an embedded
>>>>>>>>> timestamp (embedded record timestamps got
>>>>>>>>> introduced in Kafka's message format in Kafka
>>>>>>>>> 0.10). This might happen, if you consume a topic 
>>>>>>>>> that is written by old Kafka producer clients (ie,
>>>>>>>>> version 0.9 or earlier) or third party producer
>>>>>>>>> clients. A common situation where this may happen
>>>>>>>>> is after upgrading your Kafka cluster from 0.9 to
>>>>>>>>> 0.10, where all the data that was generated with
>>>>>>>>> 0.9 is not compatible with the 0.10 message format.
>>>>>>>>> If you are using a custom timestamp extractor,
>>>>>>>>> make sure that your extractor is robust to missing
>>>>>>>>> timestamps in your records. For example, you can
>>>>>>>>> return a default or estimated timestamp if you
>>>>>>>>> cannot extract a valid timestamp (maybe the
>>>>>>>>> timstamp field in your data is just missing). You
>>>>>>>>> can also switch to processing time semantics via 
>>>>>>>>> WallclockTimestampExtractor; whether such a
>>>>>>>>> fallback is an appropriate response to this
>>>>>>>>> situation depends on your use case. However, as a
>>>>>>>>> first step you should identify and fix the root
>>>>>>>>> cause for why such problematic records were written
>>>>>>>>> to Kafka in the first place. In a second step you 
>>>>>>>>> may consider applying workarounds (as described
>>>>>>>>> above) when dealing with such records (for example,
>>>>>>>>> if you need to process those records after all).
>>>>>>>>> Another option is to regenerate the records with
>>>>>>>>> correct timestamps and write them to a new Kafka
>>>>>>>>> topic.
>>>>>>>>> 
>>>>>>>>> When the timestamp extractor causes the problem:
>>>>>>>>> 
>>>>>>>>> In this situation you should debug and fix the
>>>>>>>>> erroneous extractor. If the extractor is built into
>>>>>>>>> Kafka, please report the bug to the Kafka developer
>>>>>>>>> mailing list at dev@kafka.apache.org (see
>>>>>>>>> instructions http://kafka.apache.org/contact); in
>>>>>>>>> the meantime, you may write a custom timestamp
>>>>>>>>> extractor that fixes the problem and configure your
>>>>>>>>> application to use that extractor for the time
>>>>>>>>> being.
>>>>> 
>>>>> 
>>>>>> To address you questions more concretely:
>>>>> 
>>>>>> 1. Yes an no: Yes, for any new data you write to you
>>>>>> topic. No, for any already written data that does not
>>>>>> have a valid timestamp set 2. Default is creating time 3.
>>>>>> Config parameter "message.timestamp.type") It's a broker
>>>>>> side per topic setting (however, be aware that Java
>>>>>> KafkaProducer does verify the timestamp locally before
>>>>>> sending the message to the broker, thus on -1 there will
>>>>>> be the client side exception you did observe( 4. I assume
>>>>>> that you do consumer different topic with different TS 
>>>>>> fields in you records.
>>>>> 
>>>>>> Also have a look at: 
>>>>>> http://docs.confluent.io/current/streams/concepts.html#time
>>>>>
>>>>>
>>>>>
>>>>>>
>>>>>> 
- -Matthias
>>>>> 
>>>>> 
>>>>>> On 10/28/16 5:42 AM, Debasish Ghosh wrote:
>>>>>>>>> I am actually using 0.10.0 and NOT 0.10.1 as I
>>>>>>>>> mentioned in the last mail. And I am using Kafka
>>>>>>>>> within a DC/OS cluster under AWS.
>>>>>>>>> 
>>>>>>>>> The version that I mentioned works ok is on my
>>>>>>>>> local machine using a local Kafka installation. And
>>>>>>>>> it works for both single broker and multi broker
>>>>>>>>> scenario.
>>>>>>>>> 
>>>>>>>>> Thanks.
>>>>>>>>> 
>>>>>>>>> On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh 
>>>>>>>>> <gh...@gmail.com> wrote:
>>>>>>>>> 
>>>>>>>>>> Hello -
>>>>>>>>>> 
>>>>>>>>>> I am a beginner in Kafka .. with my first Kafka
>>>>>>>>>> streams application ..
>>>>>>>>>> 
>>>>>>>>>> I have a streams application that reads from a
>>>>>>>>>> topic, does some transformation on the data and
>>>>>>>>>> writes to another topic. The record that I
>>>>>>>>>> manipulate is a CSV record.
>>>>>>>>>> 
>>>>>>>>>> It runs fine when I run it on a local Kafka
>>>>>>>>>> instance.
>>>>>>>>>> 
>>>>>>>>>> However when I run it on an AWS cluster, I get
>>>>>>>>>> the following exception when I try to produce
>>>>>>>>>> the transformed record into the target topic.
>>>>>>>>>> 
>>>>>>>>>> Exception in thread "StreamThread-1" 
>>>>>>>>>> java.lang.IllegalArgumentException: Invalid
>>>>>>>>>> timestamp -1 at 
>>>>>>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>
>>>>>>>>>>
>>>>>>>>>> 
(ProducerRecord.java:60) at
>>>>>>>>>> org.apache.kafka.streams.processor.internals.SinkNode.
>>>>>>>>>>
>>>>>>>>>> 
process(SinkNode.java:72) at
>>>>>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>>>>>> StreamTask.forward(StreamTask.java:338) at 
>>>>>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>>>>>>>
>>>>>>>>>>
>
>>>>>>>>>> 
at
>>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
>>>>>>>>>>
>>>>>>>>>>
>>>>>
>>>>>>>>>>
>
>>>>>>>>>> 
KStreamMapProcessor.process(KStreamMapValues.java:42) at
>>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pr
oces
>
>>>>>>>>>> 
s
>>>>> 
>>>>>>>>>> 
> (
>>>>>>>>>> 
>>>>>>>>>> 
>>>>> 
>>>>>>>>>> 
>>>>> ProcessorNode.java:68)
>>>>>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>>>>>> StreamTask.forward(StreamTask.java:338) at 
>>>>>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>>>>>>>
>>>>>>>>>>
>
>>>>>>>>>> 
at
>>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamPassThrough
$
>>>>>>>>>>
>>>>>>>>>>
>>>>>
>>>>>>>>>>
>
>>>>>>>>>> 
KStreamPassThroughProcessor.process(KStreamPassThrough.java:34)
>>>>>>>>>> at 
>>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pr
oces
>
>>>>>>>>>> 
s
>>>>> 
>>>>>>>>>> 
> (
>>>>>>>>>> 
>>>>>>>>>> 
>>>>> 
>>>>>>>>>> 
>>>>> ProcessorNode.java:68)
>>>>>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>>>>>> StreamTask.forward(StreamTask.java:351) at 
>>>>>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:192)
>>>>>>>>>>
>>>>>>>>>>
>
>>>>>>>>>> 
at
>>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamBranch$
>>>>>>>>>>
>>>>>>>>>> 
KStreamBranchProcessor.process(KStreamBranch.java:46) at
>>>>>>>>>> 
>>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pr
oces
>
>>>>>>>>>> 
s
>>>>> 
>>>>>>>>>> 
> (
>>>>>>>>>> 
>>>>>>>>>> 
>>>>> 
>>>>>>>>>> 
>>>>> ProcessorNode.java:68)
>>>>>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>>>>>> StreamTask.forward(StreamTask.java:338) at 
>>>>>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>>>>>>>
>>>>>>>>>>
>
>>>>>>>>>> 
at
>>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
>>>>>>>>>>
>>>>>>>>>>
>>>>>
>>>>>>>>>>
>
>>>>>>>>>> 
KStreamMapProcessor.process(KStreamMapValues.java:42) at
>>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pr
oces
>
>>>>>>>>>> 
s
>>>>> 
>>>>>>>>>> 
> (
>>>>>>>>>> 
>>>>>>>>>> 
>>>>> 
>>>>>>>>>> 
>>>>> ProcessorNode.java:68)
>>>>>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>>>>>> StreamTask.forward(StreamTask.java:338) at 
>>>>>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>>>>>>>
>>>>>>>>>>
>
>>>>>>>>>> 
at org.apache.kafka.streams.processor.internals.
>>>>>>>>>> SourceNode.process(SourceNode.java:64) at 
>>>>>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>>>>>> StreamTask.process(StreamTask.java:174) at 
>>>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.run
Loop
>
>>>>>>>>>> 
(
>>>>>>>>>> 
>>>>>>>>>> 
>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>> StreamThread.java:320)
>>>>>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>>>>>> StreamThread.run(StreamThread.java:218)
>>>>>>>>>> 
>>>>>>>>>> Looks like the timestamp passed to the
>>>>>>>>>> ProducerRecord is -1, though I am not passing any
>>>>>>>>>> timestamp explicitly. I am not sure why this
>>>>>>>>>> happens. But I see that the Javadoc for
>>>>>>>>>> ProducerRecord says the following ..
>>>>>>>>>> 
>>>>>>>>>> The record also has an associated timestamp. If
>>>>>>>>>> the user did not provide a
>>>>>>>>>>> timestamp, the producer will stamp the record
>>>>>>>>>>> with its current time. The timestamp eventually
>>>>>>>>>>> used by Kafka depends on the timestamp type
>>>>>>>>>>> configured for the topic. If the topic is
>>>>>>>>>>> configured to use CreateTime, the timestamp in
>>>>>>>>>>> the producer record will be used by the broker.
>>>>>>>>>>> If the topic is configured to use 
>>>>>>>>>>> LogAppendTime, the timestamp in the producer
>>>>>>>>>>> record will be overwritten by the broker with
>>>>>>>>>>> the broker local time when it appends the
>>>>>>>>>>> message to its log. In either of the cases
>>>>>>>>>>> above, the timestamp that has actually been
>>>>>>>>>>> used will be returned to user in
>>>>>>>>>>> RecordMetadata
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 1. Will this problem be solved if I configure the
>>>>>>>>>> topic with LogAppendTime or CreateTime explicitly
>>>>>>>>>> ? 2. What is the default setting of this property
>>>>>>>>>> in a newly created topic ? 3. How do I change it
>>>>>>>>>> (what is the name of the property to be set) ? 4.
>>>>>>>>>> Any idea why I face this problem in the cluster
>>>>>>>>>> mode but not in the local mode ?
>>>>>>>>>> 
>>>>>>>>>> BTW I am using 0.10.1.
>>>>>>>>>> 
>>>>>>>>>> Any help / pointer will be appreciated ?
>>>>>>>>>> 
>>>>>>>>>> regards.
>>>>>>>>>> 
>>>>>>>>>> -- Debasish Ghosh http://manning.com/ghosh2 
>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>> 
>>>>>>>>>> Twttr: @debasishg Blog:
>>>>>>>>>> http://debasishg.blogspot.com Code:
>>>>>>>>>> http://github.com/debasishg
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>> 
>> 
>> 
>> 
>> -- Debasish Ghosh http://manning.com/ghosh2 
>> http://manning.com/ghosh
>> 
>> Twttr: @debasishg Blog: http://debasishg.blogspot.com Code:
>> http://github.com/debasishg
>> 
> 
> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYFaONAAoJECnhiMLycopPH/4P+gOUVW4Ab6UBlrRiap1snDkC
1xNaWIzlyF2i8nZ+FUTmXrvJiF1TSSw8f+apyZpjf+Q8uMS1Bv6ZzqqiHVC0+gFb
Ymis+pOHhEl3je5uJWf41emrUxvJHalDlrLqLCk0cxlTYgBCgoAxLtzbvFrejw0e
uYcfjz+mERK4upNZS3KbO8tMMpr+M163u02dUAhT6kuaJNfSICNKKEVIK9WrCAMN
skhrVhcM6XRh6YisU73erg2grcGAMTxYf53eWt8saRqICMlcmxDbTHSxJ6Qog4l4
c6OSxxtnB+rXYSDEtoWH3CSxkPK0zlLUDcKXqz4bUNrWgoaCjAfz2WuANNskUi3L
dZlfO+vvbS2NLjqNFzqVUjV5tnbdCL7MTO4ByQZ7Jh9TQeOyMkHW8+fGsCZUJ3Ex
SIx95MYJyOk1n390yFjjeJEQT/yHIq7nXXgxjJ6dBPfEIB6VwXOHGvaucXMus2XF
ioBr/CuoNhWTfHn19TNSkSObJP61W2YCA1xHlSzVutHoISKHZKRq0z6AbbLdD7Z3
weLV5zHrnmibeKDYc+OX+60Kr+YsgSqeNwDx+EVFjkMTagUydzg06PcWigSLl6ZC
2rxs2Esb0W+Q4Iy+IYCzfQuQGQz5oSiUB32hxTio6dDz8otBY6+U3QrBz7mOtc+v
OeP4OeKsbMBBZGJNa0Ux
=NjV2
-----END PGP SIGNATURE-----

Re: Problem with timestamp in Producer

Posted by Debasish Ghosh <gh...@gmail.com>.
Hello Mathias -

Regarding ..

In case you do have 0.10 brokers, it might however happen, that
bin/kafka-console-producer.sh
> does use 0.9 producer.


How can I check this ?

Thanks!

On Sat, Oct 29, 2016 at 12:23 PM, Debasish Ghosh <gh...@gmail.com>
wrote:

> I agree .. the problem is DC/OS still ships the older version. Let me
> check if I can upgrade this ..
>
> Thanks!
>
> On Sat, Oct 29, 2016 at 12:21 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
>> -----BEGIN PGP SIGNED MESSAGE-----
>> Hash: SHA512
>>
>> Btw: I would highly recommend to use Kafka 0.10.1 -- there are many
>> new Streams feature and usability improvements and bug fixes.
>>
>> - -Matthias
>>
>> On 10/28/16 11:42 PM, Matthias J. Sax wrote:
>> > That sounds reasonable. However, I am wondering how your Streams
>> > application can connect to 0.9 broker in the first place. Streams
>> > internally uses standard Kafka clients, and those are not backward
>> > compatible. Thus, the 0.10 Streams clients should not be able to
>> > connect to 0.9 broker.
>> >
>> > In case you do have 0.10 brokers, it might however happen, that
>> > bin/kafka-console-producer.sh does use 0.9 producer. Broker are
>> > backward compatible, thus, a 0.9 producer can write to 0.10 broker
>> > (and in this case record TS would be invalid). While I assume that
>> > in you local environment you are using 0.10
>> > bin/kafka-console-produer.sh and thus all works fine.
>> >
>> >
>> > -Matthias
>> >
>> >
>> > On 10/28/16 11:00 PM, Debasish Ghosh wrote:
>> >> Hello Mathias -
>> >
>> >> Thanks a lot for the response. I think what may be happening is
>> >> a version mismatch between the development & deployment versions
>> >> of Kafka. The Kafka streams application that I developed uses
>> >> 0.10.0 based libraries. And my local environment contains a
>> >> server installation of the same version. Hence it works ok in my
>> >> local environment.
>> >
>> >> But the DC/OS 1.8 deploys 0.9.0 of Kafka when I install the
>> >> service through DC/OS cli. And I use this version to load records
>> >> into the input topic. And try to consume using the deployed
>> >> streams application which I developed using 0.10.0. Hence the
>> >> producer did not put the timestamp while the consumer expects to
>> >> have one.
>> >
>> >> I need to check if 0.10.x is available for DC/OS ..
>> >
>> >> Thanks again for your suggestions.
>> >
>> >
>> >> On Sat, Oct 29, 2016 at 3:23 AM, Matthias J. Sax
>> >> <ma...@confluent.io> wrote:
>> >
>> >> Hey,
>> >
>> >> we just added a new FAQ entry for upcoming CP 3.2 release that
>> >> answers your question. I just c&p it here. More concrete answer
>> >> below.
>> >
>> >>>>> If you get an exception similar to the one shown below,
>> >>>>> there are multiple possible causes:
>> >>>>>
>> >>>>> Exception in thread "StreamThread-1"
>> >>>>> java.lang.IllegalArgumentException: Invalid timestamp -1 at
>> >>>>>
>> >>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRe
>> c
>> >
>> >>>>>
>> ord
>> >
>> >>>>>
>> > .java:60)
>> >>>>>
>> >>>>> This error means that the timestamp extractor of your Kafka
>> >>>>>  Streams application failed to extract a valid timestamp
>> >>>>> from a record. Typically, this points to a problem with the
>> >>>>> record (e.g., the record does not contain a timestamp at
>> >>>>> all), but it could also indicate a problem or bug in the
>> >>>>> timestamp extractor used by the application.
>> >>>>>
>> >>>>> When does a record not contain a valid timestamp:
>> >>>>>
>> >>>>> If you are using the default
>> >>>>> ConsumerRecordTimestampExtractor, it is most likely that
>> >>>>> your records do not carry an embedded timestamp (embedded
>> >>>>> record timestamps got introduced in Kafka's message format
>> >>>>> in Kafka 0.10). This might happen, if you consume a topic
>> >>>>> that is written by old Kafka producer clients (ie, version
>> >>>>> 0.9 or earlier) or third party producer clients. A common
>> >>>>> situation where this may happen is after upgrading your
>> >>>>> Kafka cluster from 0.9 to 0.10, where all the data that was
>> >>>>> generated with 0.9 is not compatible with the 0.10 message
>> >>>>> format. If you are using a custom timestamp extractor, make
>> >>>>> sure that your extractor is robust to missing timestamps in
>> >>>>> your records. For example, you can return a default or
>> >>>>> estimated timestamp if you cannot extract a valid timestamp
>> >>>>> (maybe the timstamp field in your data is just missing).
>> >>>>> You can also switch to processing time semantics via
>> >>>>> WallclockTimestampExtractor; whether such a fallback is an
>> >>>>> appropriate response to this situation depends on your use
>> >>>>> case. However, as a first step you should identify and fix
>> >>>>> the root cause for why such problematic records were
>> >>>>> written to Kafka in the first place. In a second step you
>> >>>>> may consider applying workarounds (as described above) when
>> >>>>> dealing with such records (for example, if you need to
>> >>>>> process those records after all). Another option is to
>> >>>>> regenerate the records with correct timestamps and write
>> >>>>> them to a new Kafka topic.
>> >>>>>
>> >>>>> When the timestamp extractor causes the problem:
>> >>>>>
>> >>>>> In this situation you should debug and fix the erroneous
>> >>>>> extractor. If the extractor is built into Kafka, please
>> >>>>> report the bug to the Kafka developer mailing list at
>> >>>>> dev@kafka.apache.org (see instructions
>> >>>>> http://kafka.apache.org/contact); in the meantime, you may
>> >>>>> write a custom timestamp extractor that fixes the problem
>> >>>>> and configure your application to use that extractor for
>> >>>>> the time being.
>> >
>> >
>> >> To address you questions more concretely:
>> >
>> >> 1. Yes an no: Yes, for any new data you write to you topic. No,
>> >> for any already written data that does not have a valid
>> >> timestamp set 2. Default is creating time 3. Config parameter
>> >> "message.timestamp.type") It's a broker side per topic setting
>> >> (however, be aware that Java KafkaProducer does verify the
>> >> timestamp locally before sending the message to the broker, thus
>> >> on -1 there will be the client side exception you did observe( 4.
>> >> I assume that you do consumer different topic with different TS
>> >> fields in you records.
>> >
>> >> Also have a look at:
>> >> http://docs.confluent.io/current/streams/concepts.html#time
>> >
>> >
>> >
>> >> -Matthias
>> >
>> >
>> >> On 10/28/16 5:42 AM, Debasish Ghosh wrote:
>> >>>>> I am actually using 0.10.0 and NOT 0.10.1 as I mentioned
>> >>>>> in the last mail. And I am using Kafka within a DC/OS
>> >>>>> cluster under AWS.
>> >>>>>
>> >>>>> The version that I mentioned works ok is on my local
>> >>>>> machine using a local Kafka installation. And it works for
>> >>>>> both single broker and multi broker scenario.
>> >>>>>
>> >>>>> Thanks.
>> >>>>>
>> >>>>> On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh
>> >>>>> <gh...@gmail.com> wrote:
>> >>>>>
>> >>>>>> Hello -
>> >>>>>>
>> >>>>>> I am a beginner in Kafka .. with my first Kafka streams
>> >>>>>> application ..
>> >>>>>>
>> >>>>>> I have a streams application that reads from a topic,
>> >>>>>> does some transformation on the data and writes to
>> >>>>>> another topic. The record that I manipulate is a CSV
>> >>>>>> record.
>> >>>>>>
>> >>>>>> It runs fine when I run it on a local Kafka instance.
>> >>>>>>
>> >>>>>> However when I run it on an AWS cluster, I get the
>> >>>>>> following exception when I try to produce the
>> >>>>>> transformed record into the target topic.
>> >>>>>>
>> >>>>>> Exception in thread "StreamThread-1"
>> >>>>>> java.lang.IllegalArgumentException: Invalid timestamp -1
>> >>>>>> at
>> >>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>
>> >>>>>> (ProducerRecord.java:60) at
>> >>>>>> org.apache.kafka.streams.processor.internals.SinkNode.
>> >>>>>> process(SinkNode.java:72) at
>> >>>>>> org.apache.kafka.streams.processor.internals.
>> >>>>>> StreamTask.forward(StreamTask.java:338) at
>> >>>>>> org.apache.kafka.streams.processor.internals.
>> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>> >>>>>>
>> >>>>>>
>> at
>> >>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
>> >>>>>>
>> >>>>>>
>> >
>> >>>>>>
>> KStreamMapProcessor.process(KStreamMapValues.java:42) at
>> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces
>> s
>> >
>> >>>>>>
>> (
>> >>>>>>
>> >>>>>>
>> >
>> >>>>>>
>> > ProcessorNode.java:68)
>> >>>>>> at org.apache.kafka.streams.processor.internals.
>> >>>>>> StreamTask.forward(StreamTask.java:338) at
>> >>>>>> org.apache.kafka.streams.processor.internals.
>> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>> >>>>>>
>> >>>>>>
>> at
>> >>>>>> org.apache.kafka.streams.kstream.internals.KStreamPassThrough$
>> >>>>>>
>> >>>>>>
>> >
>> >>>>>>
>> KStreamPassThroughProcessor.process(KStreamPassThrough.java:34)
>> >>>>>> at
>> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces
>> s
>> >
>> >>>>>>
>> (
>> >>>>>>
>> >>>>>>
>> >
>> >>>>>>
>> > ProcessorNode.java:68)
>> >>>>>> at org.apache.kafka.streams.processor.internals.
>> >>>>>> StreamTask.forward(StreamTask.java:351) at
>> >>>>>> org.apache.kafka.streams.processor.internals.
>> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:192)
>> >>>>>>
>> >>>>>>
>> at
>> >>>>>> org.apache.kafka.streams.kstream.internals.KStreamBranch$
>> >>>>>>  KStreamBranchProcessor.process(KStreamBranch.java:46) at
>> >>>>>>
>> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces
>> s
>> >
>> >>>>>>
>> (
>> >>>>>>
>> >>>>>>
>> >
>> >>>>>>
>> > ProcessorNode.java:68)
>> >>>>>> at org.apache.kafka.streams.processor.internals.
>> >>>>>> StreamTask.forward(StreamTask.java:338) at
>> >>>>>> org.apache.kafka.streams.processor.internals.
>> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>> >>>>>>
>> >>>>>>
>> at
>> >>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
>> >>>>>>
>> >>>>>>
>> >
>> >>>>>>
>> KStreamMapProcessor.process(KStreamMapValues.java:42) at
>> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces
>> s
>> >
>> >>>>>>
>> (
>> >>>>>>
>> >>>>>>
>> >
>> >>>>>>
>> > ProcessorNode.java:68)
>> >>>>>> at org.apache.kafka.streams.processor.internals.
>> >>>>>> StreamTask.forward(StreamTask.java:338) at
>> >>>>>> org.apache.kafka.streams.processor.internals.
>> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>> >>>>>>
>> >>>>>>
>> at org.apache.kafka.streams.processor.internals.
>> >>>>>> SourceNode.process(SourceNode.java:64) at
>> >>>>>> org.apache.kafka.streams.processor.internals.
>> >>>>>> StreamTask.process(StreamTask.java:174) at
>> >>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop
>> (
>> >>>>>>
>> >>>>>>
>> >
>> >>>>>>
>> >>>>>>
>> > StreamThread.java:320)
>> >>>>>> at org.apache.kafka.streams.processor.internals.
>> >>>>>> StreamThread.run(StreamThread.java:218)
>> >>>>>>
>> >>>>>> Looks like the timestamp passed to the ProducerRecord is
>> >>>>>> -1, though I am not passing any timestamp explicitly. I
>> >>>>>> am not sure why this happens. But I see that the Javadoc
>> >>>>>> for ProducerRecord says the following ..
>> >>>>>>
>> >>>>>> The record also has an associated timestamp. If the user
>> >>>>>> did not provide a
>> >>>>>>> timestamp, the producer will stamp the record with its
>> >>>>>>> current time. The timestamp eventually used by Kafka
>> >>>>>>> depends on the timestamp type configured for the
>> >>>>>>> topic. If the topic is configured to use CreateTime,
>> >>>>>>> the timestamp in the producer record will be used by
>> >>>>>>> the broker. If the topic is configured to use
>> >>>>>>> LogAppendTime, the timestamp in the producer record
>> >>>>>>> will be overwritten by the broker with the broker local
>> >>>>>>> time when it appends the message to its log. In either
>> >>>>>>> of the cases above, the timestamp that has actually
>> >>>>>>> been used will be returned to user in RecordMetadata
>> >>>>>>
>> >>>>>>
>> >>>>>> 1. Will this problem be solved if I configure the topic
>> >>>>>> with LogAppendTime or CreateTime explicitly ? 2. What is
>> >>>>>> the default setting of this property in a newly created
>> >>>>>> topic ? 3. How do I change it (what is the name of the
>> >>>>>> property to be set) ? 4. Any idea why I face this
>> >>>>>> problem in the cluster mode but not in the local mode ?
>> >>>>>>
>> >>>>>> BTW I am using 0.10.1.
>> >>>>>>
>> >>>>>> Any help / pointer will be appreciated ?
>> >>>>>>
>> >>>>>> regards.
>> >>>>>>
>> >>>>>> -- Debasish Ghosh http://manning.com/ghosh2
>> >>>>>> http://manning.com/ghosh
>> >>>>>>
>> >>>>>> Twttr: @debasishg Blog: http://debasishg.blogspot.com
>> >>>>>> Code: http://github.com/debasishg
>> >>>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>
>> >
>> >
>> >
>> >
>> -----BEGIN PGP SIGNATURE-----
>> Comment: GPGTools - https://gpgtools.org
>>
>> iQIcBAEBCgAGBQJYFEcMAAoJECnhiMLycopPPrwQAJyWn+InO+JcrDnNaSkfEt3n
>> 6sp5rjINdTEA1PIorEDDQcwaq8gB/DwTQOKsBUDnukLc4VI/HPzpWRaBGVJkw+ki
>> tm1UpGG4LBlvQ/E4S3a+c15X03IgNQ3htLwipuC0qqtpYmo2OB2+035Ewch1RlRl
>> E3mL1v14CEsvf/a+If3w+wkS3CoSey6SlWBk//Z0OCd7yy68DxO94JpxnP0M7vNe
>> zICCnxqSHTFjNMipQP/uX0hT2HM0J1q4HeWCKcVB6VQgpu97gypQT25L5iatOv41
>> mFXVFKrYllvlYgLXq5PakI47H1DnkZNlN8maiKLC+7nrzqy0VTQhdxPLg6mVqVPX
>> MrkJ2jzrvI58F37Ac8vRFvgBJo5XVgaocY71rLmrVn3WA4oUpJRGB5fZe5vqJbDn
>> xAPjgRU2BA3l8nekG5iQ1O5osAhkT4PNzA/WTV2FGoNUu/zNupfe0Qipnsm8rqIM
>> RNTlCzDQU2X3dqUTm+Ze5Sn6WTjyiu9HPhYXrCgncAMFHMVH/4Tq53aJoiC7cz72
>> IMXrQr7oU8hkgCzDMQ+kncHnquj23xDt7lsUyD8AJ6hfOcDLKQ3XyXo72bjnpGYt
>> 21qBP3JqABkeHYrSFuR3BCL/VJ0JSGgjBVkKjXwZOZ+3lDAuHRd/5ZR5AeoveHwO
>> rA3fRxGlqR7RWyElKC51
>> =zBM7
>> -----END PGP SIGNATURE-----
>>
>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Re: Problem with timestamp in Producer

Posted by Debasish Ghosh <gh...@gmail.com>.
I agree .. the problem is DC/OS still ships the older version. Let me check
if I can upgrade this ..

Thanks!

On Sat, Oct 29, 2016 at 12:21 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> Btw: I would highly recommend to use Kafka 0.10.1 -- there are many
> new Streams feature and usability improvements and bug fixes.
>
> - -Matthias
>
> On 10/28/16 11:42 PM, Matthias J. Sax wrote:
> > That sounds reasonable. However, I am wondering how your Streams
> > application can connect to 0.9 broker in the first place. Streams
> > internally uses standard Kafka clients, and those are not backward
> > compatible. Thus, the 0.10 Streams clients should not be able to
> > connect to 0.9 broker.
> >
> > In case you do have 0.10 brokers, it might however happen, that
> > bin/kafka-console-producer.sh does use 0.9 producer. Broker are
> > backward compatible, thus, a 0.9 producer can write to 0.10 broker
> > (and in this case record TS would be invalid). While I assume that
> > in you local environment you are using 0.10
> > bin/kafka-console-produer.sh and thus all works fine.
> >
> >
> > -Matthias
> >
> >
> > On 10/28/16 11:00 PM, Debasish Ghosh wrote:
> >> Hello Mathias -
> >
> >> Thanks a lot for the response. I think what may be happening is
> >> a version mismatch between the development & deployment versions
> >> of Kafka. The Kafka streams application that I developed uses
> >> 0.10.0 based libraries. And my local environment contains a
> >> server installation of the same version. Hence it works ok in my
> >> local environment.
> >
> >> But the DC/OS 1.8 deploys 0.9.0 of Kafka when I install the
> >> service through DC/OS cli. And I use this version to load records
> >> into the input topic. And try to consume using the deployed
> >> streams application which I developed using 0.10.0. Hence the
> >> producer did not put the timestamp while the consumer expects to
> >> have one.
> >
> >> I need to check if 0.10.x is available for DC/OS ..
> >
> >> Thanks again for your suggestions.
> >
> >
> >> On Sat, Oct 29, 2016 at 3:23 AM, Matthias J. Sax
> >> <ma...@confluent.io> wrote:
> >
> >> Hey,
> >
> >> we just added a new FAQ entry for upcoming CP 3.2 release that
> >> answers your question. I just c&p it here. More concrete answer
> >> below.
> >
> >>>>> If you get an exception similar to the one shown below,
> >>>>> there are multiple possible causes:
> >>>>>
> >>>>> Exception in thread "StreamThread-1"
> >>>>> java.lang.IllegalArgumentException: Invalid timestamp -1 at
> >>>>>
> >>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRe
> c
> >
> >>>>>
> ord
> >
> >>>>>
> > .java:60)
> >>>>>
> >>>>> This error means that the timestamp extractor of your Kafka
> >>>>>  Streams application failed to extract a valid timestamp
> >>>>> from a record. Typically, this points to a problem with the
> >>>>> record (e.g., the record does not contain a timestamp at
> >>>>> all), but it could also indicate a problem or bug in the
> >>>>> timestamp extractor used by the application.
> >>>>>
> >>>>> When does a record not contain a valid timestamp:
> >>>>>
> >>>>> If you are using the default
> >>>>> ConsumerRecordTimestampExtractor, it is most likely that
> >>>>> your records do not carry an embedded timestamp (embedded
> >>>>> record timestamps got introduced in Kafka's message format
> >>>>> in Kafka 0.10). This might happen, if you consume a topic
> >>>>> that is written by old Kafka producer clients (ie, version
> >>>>> 0.9 or earlier) or third party producer clients. A common
> >>>>> situation where this may happen is after upgrading your
> >>>>> Kafka cluster from 0.9 to 0.10, where all the data that was
> >>>>> generated with 0.9 is not compatible with the 0.10 message
> >>>>> format. If you are using a custom timestamp extractor, make
> >>>>> sure that your extractor is robust to missing timestamps in
> >>>>> your records. For example, you can return a default or
> >>>>> estimated timestamp if you cannot extract a valid timestamp
> >>>>> (maybe the timstamp field in your data is just missing).
> >>>>> You can also switch to processing time semantics via
> >>>>> WallclockTimestampExtractor; whether such a fallback is an
> >>>>> appropriate response to this situation depends on your use
> >>>>> case. However, as a first step you should identify and fix
> >>>>> the root cause for why such problematic records were
> >>>>> written to Kafka in the first place. In a second step you
> >>>>> may consider applying workarounds (as described above) when
> >>>>> dealing with such records (for example, if you need to
> >>>>> process those records after all). Another option is to
> >>>>> regenerate the records with correct timestamps and write
> >>>>> them to a new Kafka topic.
> >>>>>
> >>>>> When the timestamp extractor causes the problem:
> >>>>>
> >>>>> In this situation you should debug and fix the erroneous
> >>>>> extractor. If the extractor is built into Kafka, please
> >>>>> report the bug to the Kafka developer mailing list at
> >>>>> dev@kafka.apache.org (see instructions
> >>>>> http://kafka.apache.org/contact); in the meantime, you may
> >>>>> write a custom timestamp extractor that fixes the problem
> >>>>> and configure your application to use that extractor for
> >>>>> the time being.
> >
> >
> >> To address you questions more concretely:
> >
> >> 1. Yes an no: Yes, for any new data you write to you topic. No,
> >> for any already written data that does not have a valid
> >> timestamp set 2. Default is creating time 3. Config parameter
> >> "message.timestamp.type") It's a broker side per topic setting
> >> (however, be aware that Java KafkaProducer does verify the
> >> timestamp locally before sending the message to the broker, thus
> >> on -1 there will be the client side exception you did observe( 4.
> >> I assume that you do consumer different topic with different TS
> >> fields in you records.
> >
> >> Also have a look at:
> >> http://docs.confluent.io/current/streams/concepts.html#time
> >
> >
> >
> >> -Matthias
> >
> >
> >> On 10/28/16 5:42 AM, Debasish Ghosh wrote:
> >>>>> I am actually using 0.10.0 and NOT 0.10.1 as I mentioned
> >>>>> in the last mail. And I am using Kafka within a DC/OS
> >>>>> cluster under AWS.
> >>>>>
> >>>>> The version that I mentioned works ok is on my local
> >>>>> machine using a local Kafka installation. And it works for
> >>>>> both single broker and multi broker scenario.
> >>>>>
> >>>>> Thanks.
> >>>>>
> >>>>> On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh
> >>>>> <gh...@gmail.com> wrote:
> >>>>>
> >>>>>> Hello -
> >>>>>>
> >>>>>> I am a beginner in Kafka .. with my first Kafka streams
> >>>>>> application ..
> >>>>>>
> >>>>>> I have a streams application that reads from a topic,
> >>>>>> does some transformation on the data and writes to
> >>>>>> another topic. The record that I manipulate is a CSV
> >>>>>> record.
> >>>>>>
> >>>>>> It runs fine when I run it on a local Kafka instance.
> >>>>>>
> >>>>>> However when I run it on an AWS cluster, I get the
> >>>>>> following exception when I try to produce the
> >>>>>> transformed record into the target topic.
> >>>>>>
> >>>>>> Exception in thread "StreamThread-1"
> >>>>>> java.lang.IllegalArgumentException: Invalid timestamp -1
> >>>>>> at
> >>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>
> >>>>>> (ProducerRecord.java:60) at
> >>>>>> org.apache.kafka.streams.processor.internals.SinkNode.
> >>>>>> process(SinkNode.java:72) at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>> StreamTask.forward(StreamTask.java:338) at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>>>
> >>>>>>
> at
> >>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
> >>>>>>
> >>>>>>
> >
> >>>>>>
> KStreamMapProcessor.process(KStreamMapValues.java:42) at
> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces
> s
> >
> >>>>>>
> (
> >>>>>>
> >>>>>>
> >
> >>>>>>
> > ProcessorNode.java:68)
> >>>>>> at org.apache.kafka.streams.processor.internals.
> >>>>>> StreamTask.forward(StreamTask.java:338) at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>>>
> >>>>>>
> at
> >>>>>> org.apache.kafka.streams.kstream.internals.KStreamPassThrough$
> >>>>>>
> >>>>>>
> >
> >>>>>>
> KStreamPassThroughProcessor.process(KStreamPassThrough.java:34)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces
> s
> >
> >>>>>>
> (
> >>>>>>
> >>>>>>
> >
> >>>>>>
> > ProcessorNode.java:68)
> >>>>>> at org.apache.kafka.streams.processor.internals.
> >>>>>> StreamTask.forward(StreamTask.java:351) at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:192)
> >>>>>>
> >>>>>>
> at
> >>>>>> org.apache.kafka.streams.kstream.internals.KStreamBranch$
> >>>>>>  KStreamBranchProcessor.process(KStreamBranch.java:46) at
> >>>>>>
> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces
> s
> >
> >>>>>>
> (
> >>>>>>
> >>>>>>
> >
> >>>>>>
> > ProcessorNode.java:68)
> >>>>>> at org.apache.kafka.streams.processor.internals.
> >>>>>> StreamTask.forward(StreamTask.java:338) at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>>>
> >>>>>>
> at
> >>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
> >>>>>>
> >>>>>>
> >
> >>>>>>
> KStreamMapProcessor.process(KStreamMapValues.java:42) at
> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces
> s
> >
> >>>>>>
> (
> >>>>>>
> >>>>>>
> >
> >>>>>>
> > ProcessorNode.java:68)
> >>>>>> at org.apache.kafka.streams.processor.internals.
> >>>>>> StreamTask.forward(StreamTask.java:338) at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>>>
> >>>>>>
> at org.apache.kafka.streams.processor.internals.
> >>>>>> SourceNode.process(SourceNode.java:64) at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>> StreamTask.process(StreamTask.java:174) at
> >>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop
> (
> >>>>>>
> >>>>>>
> >
> >>>>>>
> >>>>>>
> > StreamThread.java:320)
> >>>>>> at org.apache.kafka.streams.processor.internals.
> >>>>>> StreamThread.run(StreamThread.java:218)
> >>>>>>
> >>>>>> Looks like the timestamp passed to the ProducerRecord is
> >>>>>> -1, though I am not passing any timestamp explicitly. I
> >>>>>> am not sure why this happens. But I see that the Javadoc
> >>>>>> for ProducerRecord says the following ..
> >>>>>>
> >>>>>> The record also has an associated timestamp. If the user
> >>>>>> did not provide a
> >>>>>>> timestamp, the producer will stamp the record with its
> >>>>>>> current time. The timestamp eventually used by Kafka
> >>>>>>> depends on the timestamp type configured for the
> >>>>>>> topic. If the topic is configured to use CreateTime,
> >>>>>>> the timestamp in the producer record will be used by
> >>>>>>> the broker. If the topic is configured to use
> >>>>>>> LogAppendTime, the timestamp in the producer record
> >>>>>>> will be overwritten by the broker with the broker local
> >>>>>>> time when it appends the message to its log. In either
> >>>>>>> of the cases above, the timestamp that has actually
> >>>>>>> been used will be returned to user in RecordMetadata
> >>>>>>
> >>>>>>
> >>>>>> 1. Will this problem be solved if I configure the topic
> >>>>>> with LogAppendTime or CreateTime explicitly ? 2. What is
> >>>>>> the default setting of this property in a newly created
> >>>>>> topic ? 3. How do I change it (what is the name of the
> >>>>>> property to be set) ? 4. Any idea why I face this
> >>>>>> problem in the cluster mode but not in the local mode ?
> >>>>>>
> >>>>>> BTW I am using 0.10.1.
> >>>>>>
> >>>>>> Any help / pointer will be appreciated ?
> >>>>>>
> >>>>>> regards.
> >>>>>>
> >>>>>> -- Debasish Ghosh http://manning.com/ghosh2
> >>>>>> http://manning.com/ghosh
> >>>>>>
> >>>>>> Twttr: @debasishg Blog: http://debasishg.blogspot.com
> >>>>>> Code: http://github.com/debasishg
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>
> >
> >
> >
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYFEcMAAoJECnhiMLycopPPrwQAJyWn+InO+JcrDnNaSkfEt3n
> 6sp5rjINdTEA1PIorEDDQcwaq8gB/DwTQOKsBUDnukLc4VI/HPzpWRaBGVJkw+ki
> tm1UpGG4LBlvQ/E4S3a+c15X03IgNQ3htLwipuC0qqtpYmo2OB2+035Ewch1RlRl
> E3mL1v14CEsvf/a+If3w+wkS3CoSey6SlWBk//Z0OCd7yy68DxO94JpxnP0M7vNe
> zICCnxqSHTFjNMipQP/uX0hT2HM0J1q4HeWCKcVB6VQgpu97gypQT25L5iatOv41
> mFXVFKrYllvlYgLXq5PakI47H1DnkZNlN8maiKLC+7nrzqy0VTQhdxPLg6mVqVPX
> MrkJ2jzrvI58F37Ac8vRFvgBJo5XVgaocY71rLmrVn3WA4oUpJRGB5fZe5vqJbDn
> xAPjgRU2BA3l8nekG5iQ1O5osAhkT4PNzA/WTV2FGoNUu/zNupfe0Qipnsm8rqIM
> RNTlCzDQU2X3dqUTm+Ze5Sn6WTjyiu9HPhYXrCgncAMFHMVH/4Tq53aJoiC7cz72
> IMXrQr7oU8hkgCzDMQ+kncHnquj23xDt7lsUyD8AJ6hfOcDLKQ3XyXo72bjnpGYt
> 21qBP3JqABkeHYrSFuR3BCL/VJ0JSGgjBVkKjXwZOZ+3lDAuHRd/5ZR5AeoveHwO
> rA3fRxGlqR7RWyElKC51
> =zBM7
> -----END PGP SIGNATURE-----
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Re: Problem with timestamp in Producer

Posted by "Matthias J. Sax" <ma...@confluent.io>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Btw: I would highly recommend to use Kafka 0.10.1 -- there are many
new Streams feature and usability improvements and bug fixes.

- -Matthias

On 10/28/16 11:42 PM, Matthias J. Sax wrote:
> That sounds reasonable. However, I am wondering how your Streams 
> application can connect to 0.9 broker in the first place. Streams 
> internally uses standard Kafka clients, and those are not backward 
> compatible. Thus, the 0.10 Streams clients should not be able to 
> connect to 0.9 broker.
> 
> In case you do have 0.10 brokers, it might however happen, that 
> bin/kafka-console-producer.sh does use 0.9 producer. Broker are 
> backward compatible, thus, a 0.9 producer can write to 0.10 broker 
> (and in this case record TS would be invalid). While I assume that
> in you local environment you are using 0.10
> bin/kafka-console-produer.sh and thus all works fine.
> 
> 
> -Matthias
> 
> 
> On 10/28/16 11:00 PM, Debasish Ghosh wrote:
>> Hello Mathias -
> 
>> Thanks a lot for the response. I think what may be happening is
>> a version mismatch between the development & deployment versions
>> of Kafka. The Kafka streams application that I developed uses
>> 0.10.0 based libraries. And my local environment contains a
>> server installation of the same version. Hence it works ok in my
>> local environment.
> 
>> But the DC/OS 1.8 deploys 0.9.0 of Kafka when I install the
>> service through DC/OS cli. And I use this version to load records
>> into the input topic. And try to consume using the deployed
>> streams application which I developed using 0.10.0. Hence the
>> producer did not put the timestamp while the consumer expects to
>> have one.
> 
>> I need to check if 0.10.x is available for DC/OS ..
> 
>> Thanks again for your suggestions.
> 
> 
>> On Sat, Oct 29, 2016 at 3:23 AM, Matthias J. Sax 
>> <ma...@confluent.io> wrote:
> 
>> Hey,
> 
>> we just added a new FAQ entry for upcoming CP 3.2 release that 
>> answers your question. I just c&p it here. More concrete answer 
>> below.
> 
>>>>> If you get an exception similar to the one shown below,
>>>>> there are multiple possible causes:
>>>>> 
>>>>> Exception in thread "StreamThread-1" 
>>>>> java.lang.IllegalArgumentException: Invalid timestamp -1 at
>>>>>  
>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRe
c
>
>>>>> 
ord
> 
>>>>> 
> .java:60)
>>>>> 
>>>>> This error means that the timestamp extractor of your Kafka
>>>>>  Streams application failed to extract a valid timestamp
>>>>> from a record. Typically, this points to a problem with the
>>>>> record (e.g., the record does not contain a timestamp at
>>>>> all), but it could also indicate a problem or bug in the
>>>>> timestamp extractor used by the application.
>>>>> 
>>>>> When does a record not contain a valid timestamp:
>>>>> 
>>>>> If you are using the default 
>>>>> ConsumerRecordTimestampExtractor, it is most likely that
>>>>> your records do not carry an embedded timestamp (embedded
>>>>> record timestamps got introduced in Kafka's message format
>>>>> in Kafka 0.10). This might happen, if you consume a topic
>>>>> that is written by old Kafka producer clients (ie, version
>>>>> 0.9 or earlier) or third party producer clients. A common
>>>>> situation where this may happen is after upgrading your
>>>>> Kafka cluster from 0.9 to 0.10, where all the data that was
>>>>> generated with 0.9 is not compatible with the 0.10 message
>>>>> format. If you are using a custom timestamp extractor, make
>>>>> sure that your extractor is robust to missing timestamps in
>>>>> your records. For example, you can return a default or
>>>>> estimated timestamp if you cannot extract a valid timestamp
>>>>> (maybe the timstamp field in your data is just missing).
>>>>> You can also switch to processing time semantics via
>>>>> WallclockTimestampExtractor; whether such a fallback is an
>>>>> appropriate response to this situation depends on your use
>>>>> case. However, as a first step you should identify and fix
>>>>> the root cause for why such problematic records were
>>>>> written to Kafka in the first place. In a second step you
>>>>> may consider applying workarounds (as described above) when
>>>>> dealing with such records (for example, if you need to
>>>>> process those records after all). Another option is to
>>>>> regenerate the records with correct timestamps and write
>>>>> them to a new Kafka topic.
>>>>> 
>>>>> When the timestamp extractor causes the problem:
>>>>> 
>>>>> In this situation you should debug and fix the erroneous 
>>>>> extractor. If the extractor is built into Kafka, please 
>>>>> report the bug to the Kafka developer mailing list at 
>>>>> dev@kafka.apache.org (see instructions 
>>>>> http://kafka.apache.org/contact); in the meantime, you may 
>>>>> write a custom timestamp extractor that fixes the problem 
>>>>> and configure your application to use that extractor for
>>>>> the time being.
> 
> 
>> To address you questions more concretely:
> 
>> 1. Yes an no: Yes, for any new data you write to you topic. No, 
>> for any already written data that does not have a valid
>> timestamp set 2. Default is creating time 3. Config parameter 
>> "message.timestamp.type") It's a broker side per topic setting 
>> (however, be aware that Java KafkaProducer does verify the 
>> timestamp locally before sending the message to the broker, thus
>> on -1 there will be the client side exception you did observe( 4.
>> I assume that you do consumer different topic with different TS 
>> fields in you records.
> 
>> Also have a look at: 
>> http://docs.confluent.io/current/streams/concepts.html#time
> 
> 
> 
>> -Matthias
> 
> 
>> On 10/28/16 5:42 AM, Debasish Ghosh wrote:
>>>>> I am actually using 0.10.0 and NOT 0.10.1 as I mentioned
>>>>> in the last mail. And I am using Kafka within a DC/OS
>>>>> cluster under AWS.
>>>>> 
>>>>> The version that I mentioned works ok is on my local
>>>>> machine using a local Kafka installation. And it works for
>>>>> both single broker and multi broker scenario.
>>>>> 
>>>>> Thanks.
>>>>> 
>>>>> On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh 
>>>>> <gh...@gmail.com> wrote:
>>>>> 
>>>>>> Hello -
>>>>>> 
>>>>>> I am a beginner in Kafka .. with my first Kafka streams 
>>>>>> application ..
>>>>>> 
>>>>>> I have a streams application that reads from a topic,
>>>>>> does some transformation on the data and writes to
>>>>>> another topic. The record that I manipulate is a CSV
>>>>>> record.
>>>>>> 
>>>>>> It runs fine when I run it on a local Kafka instance.
>>>>>> 
>>>>>> However when I run it on an AWS cluster, I get the 
>>>>>> following exception when I try to produce the
>>>>>> transformed record into the target topic.
>>>>>> 
>>>>>> Exception in thread "StreamThread-1" 
>>>>>> java.lang.IllegalArgumentException: Invalid timestamp -1 
>>>>>> at
>>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init> 
>>>>>> (ProducerRecord.java:60) at 
>>>>>> org.apache.kafka.streams.processor.internals.SinkNode. 
>>>>>> process(SinkNode.java:72) at 
>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>> StreamTask.forward(StreamTask.java:338) at 
>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>>>
>>>>>> 
at
>>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
>>>>>>
>>>>>>
>
>>>>>> 
KStreamMapProcessor.process(KStreamMapValues.java:42) at
>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces
s
>
>>>>>> 
(
>>>>>> 
>>>>>> 
> 
>>>>>> 
> ProcessorNode.java:68)
>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>> StreamTask.forward(StreamTask.java:338) at 
>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>>>
>>>>>> 
at
>>>>>> org.apache.kafka.streams.kstream.internals.KStreamPassThrough$
>>>>>>
>>>>>>
>
>>>>>> 
KStreamPassThroughProcessor.process(KStreamPassThrough.java:34)
>>>>>> at 
>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces
s
>
>>>>>> 
(
>>>>>> 
>>>>>> 
> 
>>>>>> 
> ProcessorNode.java:68)
>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>> StreamTask.forward(StreamTask.java:351) at 
>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:192)
>>>>>>
>>>>>> 
at
>>>>>> org.apache.kafka.streams.kstream.internals.KStreamBranch$
>>>>>>  KStreamBranchProcessor.process(KStreamBranch.java:46) at
>>>>>>  
>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces
s
>
>>>>>> 
(
>>>>>> 
>>>>>> 
> 
>>>>>> 
> ProcessorNode.java:68)
>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>> StreamTask.forward(StreamTask.java:338) at 
>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>>>
>>>>>> 
at
>>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
>>>>>>
>>>>>>
>
>>>>>> 
KStreamMapProcessor.process(KStreamMapValues.java:42) at
>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces
s
>
>>>>>> 
(
>>>>>> 
>>>>>> 
> 
>>>>>> 
> ProcessorNode.java:68)
>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>> StreamTask.forward(StreamTask.java:338) at 
>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>>>
>>>>>> 
at org.apache.kafka.streams.processor.internals.
>>>>>> SourceNode.process(SourceNode.java:64) at 
>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>> StreamTask.process(StreamTask.java:174) at 
>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop
(
>>>>>>
>>>>>>
>
>>>>>> 
>>>>>> 
> StreamThread.java:320)
>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>> StreamThread.run(StreamThread.java:218)
>>>>>> 
>>>>>> Looks like the timestamp passed to the ProducerRecord is 
>>>>>> -1, though I am not passing any timestamp explicitly. I
>>>>>> am not sure why this happens. But I see that the Javadoc
>>>>>> for ProducerRecord says the following ..
>>>>>> 
>>>>>> The record also has an associated timestamp. If the user 
>>>>>> did not provide a
>>>>>>> timestamp, the producer will stamp the record with its 
>>>>>>> current time. The timestamp eventually used by Kafka 
>>>>>>> depends on the timestamp type configured for the
>>>>>>> topic. If the topic is configured to use CreateTime,
>>>>>>> the timestamp in the producer record will be used by
>>>>>>> the broker. If the topic is configured to use
>>>>>>> LogAppendTime, the timestamp in the producer record
>>>>>>> will be overwritten by the broker with the broker local
>>>>>>> time when it appends the message to its log. In either
>>>>>>> of the cases above, the timestamp that has actually
>>>>>>> been used will be returned to user in RecordMetadata
>>>>>> 
>>>>>> 
>>>>>> 1. Will this problem be solved if I configure the topic 
>>>>>> with LogAppendTime or CreateTime explicitly ? 2. What is 
>>>>>> the default setting of this property in a newly created 
>>>>>> topic ? 3. How do I change it (what is the name of the 
>>>>>> property to be set) ? 4. Any idea why I face this
>>>>>> problem in the cluster mode but not in the local mode ?
>>>>>> 
>>>>>> BTW I am using 0.10.1.
>>>>>> 
>>>>>> Any help / pointer will be appreciated ?
>>>>>> 
>>>>>> regards.
>>>>>> 
>>>>>> -- Debasish Ghosh http://manning.com/ghosh2 
>>>>>> http://manning.com/ghosh
>>>>>> 
>>>>>> Twttr: @debasishg Blog: http://debasishg.blogspot.com 
>>>>>> Code: http://github.com/debasishg
>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>> 
> 
> 
> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYFEcMAAoJECnhiMLycopPPrwQAJyWn+InO+JcrDnNaSkfEt3n
6sp5rjINdTEA1PIorEDDQcwaq8gB/DwTQOKsBUDnukLc4VI/HPzpWRaBGVJkw+ki
tm1UpGG4LBlvQ/E4S3a+c15X03IgNQ3htLwipuC0qqtpYmo2OB2+035Ewch1RlRl
E3mL1v14CEsvf/a+If3w+wkS3CoSey6SlWBk//Z0OCd7yy68DxO94JpxnP0M7vNe
zICCnxqSHTFjNMipQP/uX0hT2HM0J1q4HeWCKcVB6VQgpu97gypQT25L5iatOv41
mFXVFKrYllvlYgLXq5PakI47H1DnkZNlN8maiKLC+7nrzqy0VTQhdxPLg6mVqVPX
MrkJ2jzrvI58F37Ac8vRFvgBJo5XVgaocY71rLmrVn3WA4oUpJRGB5fZe5vqJbDn
xAPjgRU2BA3l8nekG5iQ1O5osAhkT4PNzA/WTV2FGoNUu/zNupfe0Qipnsm8rqIM
RNTlCzDQU2X3dqUTm+Ze5Sn6WTjyiu9HPhYXrCgncAMFHMVH/4Tq53aJoiC7cz72
IMXrQr7oU8hkgCzDMQ+kncHnquj23xDt7lsUyD8AJ6hfOcDLKQ3XyXo72bjnpGYt
21qBP3JqABkeHYrSFuR3BCL/VJ0JSGgjBVkKjXwZOZ+3lDAuHRd/5ZR5AeoveHwO
rA3fRxGlqR7RWyElKC51
=zBM7
-----END PGP SIGNATURE-----

Re: Problem with timestamp in Producer

Posted by Debasish Ghosh <gh...@gmail.com>.
I will check out all options that u mentioned. I am sure on my local it's
all 0.10.0, so no wonder it works correctly. In The cluster, I just checked
the version of Kafka that ships with DC/OS 1.8 (the version I get with dcos
package install kafka) is 0.9.0. Regarding ..

In case you do have 0.10 brokers, it might however happen, that
bin/kafka-console-producer.sh
> does use 0.9 producer.


How can I check this ?

Thanks.

On Sat, Oct 29, 2016 at 12:12 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> That sounds reasonable. However, I am wondering how your Streams
> application can connect to 0.9 broker in the first place. Streams
> internally uses standard Kafka clients, and those are not backward
> compatible. Thus, the 0.10 Streams clients should not be able to
> connect to 0.9 broker.
>
> In case you do have 0.10 brokers, it might however happen, that
> bin/kafka-console-producer.sh does use 0.9 producer. Broker are
> backward compatible, thus, a 0.9 producer can write to 0.10 broker
> (and in this case record TS would be invalid). While I assume that in
> you local environment you are using 0.10 bin/kafka-console-produer.sh
> and thus all works fine.
>
>
> - -Matthias
>
>
> On 10/28/16 11:00 PM, Debasish Ghosh wrote:
> > Hello Mathias -
> >
> > Thanks a lot for the response. I think what may be happening is a
> > version mismatch between the development & deployment versions of
> > Kafka. The Kafka streams application that I developed uses 0.10.0
> > based libraries. And my local environment contains a server
> > installation of the same version. Hence it works ok in my local
> > environment.
> >
> > But the DC/OS 1.8 deploys 0.9.0 of Kafka when I install the service
> > through DC/OS cli. And I use this version to load records into the
> > input topic. And try to consume using the deployed streams
> > application which I developed using 0.10.0. Hence the producer did
> > not put the timestamp while the consumer expects to have one.
> >
> > I need to check if 0.10.x is available for DC/OS ..
> >
> > Thanks again for your suggestions.
> >
> >
> > On Sat, Oct 29, 2016 at 3:23 AM, Matthias J. Sax
> > <ma...@confluent.io> wrote:
> >
> > Hey,
> >
> > we just added a new FAQ entry for upcoming CP 3.2 release that
> > answers your question. I just c&p it here. More concrete answer
> > below.
> >
> >>>> If you get an exception similar to the one shown below, there
> >>>> are multiple possible causes:
> >>>>
> >>>> Exception in thread "StreamThread-1"
> >>>> java.lang.IllegalArgumentException: Invalid timestamp -1 at
> >>>> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRec
> ord
> >
> >>>>
> .java:60)
> >>>>
> >>>> This error means that the timestamp extractor of your Kafka
> >>>> Streams application failed to extract a valid timestamp from
> >>>> a record. Typically, this points to a problem with the record
> >>>> (e.g., the record does not contain a timestamp at all), but
> >>>> it could also indicate a problem or bug in the timestamp
> >>>> extractor used by the application.
> >>>>
> >>>> When does a record not contain a valid timestamp:
> >>>>
> >>>> If you are using the default
> >>>> ConsumerRecordTimestampExtractor, it is most likely that your
> >>>> records do not carry an embedded timestamp (embedded record
> >>>> timestamps got introduced in Kafka's message format in Kafka
> >>>> 0.10). This might happen, if you consume a topic that is
> >>>> written by old Kafka producer clients (ie, version 0.9 or
> >>>> earlier) or third party producer clients. A common situation
> >>>> where this may happen is after upgrading your Kafka cluster
> >>>> from 0.9 to 0.10, where all the data that was generated with
> >>>> 0.9 is not compatible with the 0.10 message format. If you
> >>>> are using a custom timestamp extractor, make sure that your
> >>>> extractor is robust to missing timestamps in your records.
> >>>> For example, you can return a default or estimated timestamp
> >>>> if you cannot extract a valid timestamp (maybe the timstamp
> >>>> field in your data is just missing). You can also switch to
> >>>> processing time semantics via WallclockTimestampExtractor;
> >>>> whether such a fallback is an appropriate response to this
> >>>> situation depends on your use case. However, as a first step
> >>>> you should identify and fix the root cause for why such
> >>>> problematic records were written to Kafka in the first place.
> >>>> In a second step you may consider applying workarounds (as
> >>>> described above) when dealing with such records (for example,
> >>>> if you need to process those records after all). Another
> >>>> option is to regenerate the records with correct timestamps
> >>>> and write them to a new Kafka topic.
> >>>>
> >>>> When the timestamp extractor causes the problem:
> >>>>
> >>>> In this situation you should debug and fix the erroneous
> >>>> extractor. If the extractor is built into Kafka, please
> >>>> report the bug to the Kafka developer mailing list at
> >>>> dev@kafka.apache.org (see instructions
> >>>> http://kafka.apache.org/contact); in the meantime, you may
> >>>> write a custom timestamp extractor that fixes the problem
> >>>> and configure your application to use that extractor for the
> >>>> time being.
> >
> >
> > To address you questions more concretely:
> >
> > 1. Yes an no: Yes, for any new data you write to you topic. No,
> > for any already written data that does not have a valid timestamp
> > set 2. Default is creating time 3. Config parameter
> > "message.timestamp.type") It's a broker side per topic setting
> > (however, be aware that Java KafkaProducer does verify the
> > timestamp locally before sending the message to the broker, thus on
> > -1 there will be the client side exception you did observe( 4. I
> > assume that you do consumer different topic with different TS
> > fields in you records.
> >
> > Also have a look at:
> > http://docs.confluent.io/current/streams/concepts.html#time
> >
> >
> >
> > -Matthias
> >
> >
> > On 10/28/16 5:42 AM, Debasish Ghosh wrote:
> >>>> I am actually using 0.10.0 and NOT 0.10.1 as I mentioned in
> >>>> the last mail. And I am using Kafka within a DC/OS cluster
> >>>> under AWS.
> >>>>
> >>>> The version that I mentioned works ok is on my local machine
> >>>> using a local Kafka installation. And it works for both
> >>>> single broker and multi broker scenario.
> >>>>
> >>>> Thanks.
> >>>>
> >>>> On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh
> >>>> <gh...@gmail.com> wrote:
> >>>>
> >>>>> Hello -
> >>>>>
> >>>>> I am a beginner in Kafka .. with my first Kafka streams
> >>>>> application ..
> >>>>>
> >>>>> I have a streams application that reads from a topic, does
> >>>>> some transformation on the data and writes to another
> >>>>> topic. The record that I manipulate is a CSV record.
> >>>>>
> >>>>> It runs fine when I run it on a local Kafka instance.
> >>>>>
> >>>>> However when I run it on an AWS cluster, I get the
> >>>>> following exception when I try to produce the transformed
> >>>>> record into the target topic.
> >>>>>
> >>>>> Exception in thread "StreamThread-1"
> >>>>> java.lang.IllegalArgumentException: Invalid timestamp -1
> >>>>> at org.apache.kafka.clients.producer.ProducerRecord.<init>
> >>>>> (ProducerRecord.java:60) at
> >>>>> org.apache.kafka.streams.processor.internals.SinkNode.
> >>>>> process(SinkNode.java:72) at
> >>>>> org.apache.kafka.streams.processor.internals.
> >>>>> StreamTask.forward(StreamTask.java:338) at
> >>>>> org.apache.kafka.streams.processor.internals.
> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>> at
> >>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
> >>>>>
> >>>>>
> KStreamMapProcessor.process(KStreamMapValues.java:42) at
> >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
> (
> >>>>>
> >>>>>
> >
> >>>>>
> ProcessorNode.java:68)
> >>>>> at org.apache.kafka.streams.processor.internals.
> >>>>> StreamTask.forward(StreamTask.java:338) at
> >>>>> org.apache.kafka.streams.processor.internals.
> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>> at
> >>>>> org.apache.kafka.streams.kstream.internals.KStreamPassThrough$
> >>>>>
> >>>>>
> KStreamPassThroughProcessor.process(KStreamPassThrough.java:34)
> >>>>> at
> >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
> (
> >>>>>
> >>>>>
> >
> >>>>>
> ProcessorNode.java:68)
> >>>>> at org.apache.kafka.streams.processor.internals.
> >>>>> StreamTask.forward(StreamTask.java:351) at
> >>>>> org.apache.kafka.streams.processor.internals.
> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:192)
> >>>>> at
> >>>>> org.apache.kafka.streams.kstream.internals.KStreamBranch$
> >>>>> KStreamBranchProcessor.process(KStreamBranch.java:46) at
> >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
> (
> >>>>>
> >>>>>
> >
> >>>>>
> ProcessorNode.java:68)
> >>>>> at org.apache.kafka.streams.processor.internals.
> >>>>> StreamTask.forward(StreamTask.java:338) at
> >>>>> org.apache.kafka.streams.processor.internals.
> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>> at
> >>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
> >>>>>
> >>>>>
> KStreamMapProcessor.process(KStreamMapValues.java:42) at
> >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
> (
> >>>>>
> >>>>>
> >
> >>>>>
> ProcessorNode.java:68)
> >>>>> at org.apache.kafka.streams.processor.internals.
> >>>>> StreamTask.forward(StreamTask.java:338) at
> >>>>> org.apache.kafka.streams.processor.internals.
> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>> at org.apache.kafka.streams.processor.internals.
> >>>>> SourceNode.process(SourceNode.java:64) at
> >>>>> org.apache.kafka.streams.processor.internals.
> >>>>> StreamTask.process(StreamTask.java:174) at
> >>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >>>>>
> >>>>>
> >
> >>>>>
> StreamThread.java:320)
> >>>>> at org.apache.kafka.streams.processor.internals.
> >>>>> StreamThread.run(StreamThread.java:218)
> >>>>>
> >>>>> Looks like the timestamp passed to the ProducerRecord is
> >>>>> -1, though I am not passing any timestamp explicitly. I am
> >>>>> not sure why this happens. But I see that the Javadoc for
> >>>>> ProducerRecord says the following ..
> >>>>>
> >>>>> The record also has an associated timestamp. If the user
> >>>>> did not provide a
> >>>>>> timestamp, the producer will stamp the record with its
> >>>>>> current time. The timestamp eventually used by Kafka
> >>>>>> depends on the timestamp type configured for the topic.
> >>>>>> If the topic is configured to use CreateTime, the
> >>>>>> timestamp in the producer record will be used by the
> >>>>>> broker. If the topic is configured to use LogAppendTime,
> >>>>>> the timestamp in the producer record will be overwritten
> >>>>>> by the broker with the broker local time when it appends
> >>>>>> the message to its log. In either of the cases above, the
> >>>>>> timestamp that has actually been used will be returned
> >>>>>> to user in RecordMetadata
> >>>>>
> >>>>>
> >>>>> 1. Will this problem be solved if I configure the topic
> >>>>> with LogAppendTime or CreateTime explicitly ? 2. What is
> >>>>> the default setting of this property in a newly created
> >>>>> topic ? 3. How do I change it (what is the name of the
> >>>>> property to be set) ? 4. Any idea why I face this problem
> >>>>> in the cluster mode but not in the local mode ?
> >>>>>
> >>>>> BTW I am using 0.10.1.
> >>>>>
> >>>>> Any help / pointer will be appreciated ?
> >>>>>
> >>>>> regards.
> >>>>>
> >>>>> -- Debasish Ghosh http://manning.com/ghosh2
> >>>>> http://manning.com/ghosh
> >>>>>
> >>>>> Twttr: @debasishg Blog: http://debasishg.blogspot.com
> >>>>> Code: http://github.com/debasishg
> >>>>>
> >>>>
> >>>>
> >>>>
> >>
> >
> >
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYFETkAAoJECnhiMLycopPY6EQAJhI9K5FnuAZ68ix/MDOb2d5
> mkewtvZSN2wgXBiqhouTYvqrU+//gPfIw6aONN8DW6Z5ufbzxRjclA1y9QAXaeQw
> v9tuYxxtPwkw2oMuR3T8o4WI6dEsJao//YAE6fGqx4giWzHvbGZQBqwbidGVFkHE
> K0BC+Rkuod1wJ3t8/2u/JG8YQtSV5b1t/kp6wDGS/yNzxHRhCkKr4nWdHD9y+CKJ
> ej9+CD0wSqTHB5QpMLbbgd1CgX/QeJ8jweqeD7lWrgOWoDZxqnitxHCyQ484kdyw
> ETIJWT02cAj760iZRN8l892rVtZNhpCgdY9uY4l8z9px93qS8P9kjlGDrm+jaPhu
> nIBZnM2OoWEeg5rp52fQSIg37YIuVktrkvwj5C2kH1eo5iAnmJps/cZonP5BVEDR
> T+yyYNPER6ltgpZrRugJggT8TerylA12K5ro3JuoDPxeRQIbAN8JGgqUZofiOzhq
> DY56XVYlYAKcgj+8MqbygI0Q8pORnm2YNmtZgMYWThNuz6j6Nf3MmU5YIAc0g9ge
> 5asNzW2nPxY1N6INGFn5ET4qq8AX9/75LJ+i6tAr8Ddqy0MPt8TFHwE0bCgdPCNo
> Ef99ln2esbPnLujz+PeNuAh1RBab0Hets16jYF2dBwmKElgRCFH1u2gOfMG3cpjl
> WCwK1XhEpIDdB0+pxzyD
> =NSWH
> -----END PGP SIGNATURE-----
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Re: Problem with timestamp in Producer

Posted by "Matthias J. Sax" <ma...@confluent.io>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

That sounds reasonable. However, I am wondering how your Streams
application can connect to 0.9 broker in the first place. Streams
internally uses standard Kafka clients, and those are not backward
compatible. Thus, the 0.10 Streams clients should not be able to
connect to 0.9 broker.

In case you do have 0.10 brokers, it might however happen, that
bin/kafka-console-producer.sh does use 0.9 producer. Broker are
backward compatible, thus, a 0.9 producer can write to 0.10 broker
(and in this case record TS would be invalid). While I assume that in
you local environment you are using 0.10 bin/kafka-console-produer.sh
and thus all works fine.


- -Matthias


On 10/28/16 11:00 PM, Debasish Ghosh wrote:
> Hello Mathias -
> 
> Thanks a lot for the response. I think what may be happening is a
> version mismatch between the development & deployment versions of
> Kafka. The Kafka streams application that I developed uses 0.10.0
> based libraries. And my local environment contains a server
> installation of the same version. Hence it works ok in my local
> environment.
> 
> But the DC/OS 1.8 deploys 0.9.0 of Kafka when I install the service
> through DC/OS cli. And I use this version to load records into the
> input topic. And try to consume using the deployed streams
> application which I developed using 0.10.0. Hence the producer did
> not put the timestamp while the consumer expects to have one.
> 
> I need to check if 0.10.x is available for DC/OS ..
> 
> Thanks again for your suggestions.
> 
> 
> On Sat, Oct 29, 2016 at 3:23 AM, Matthias J. Sax
> <ma...@confluent.io> wrote:
> 
> Hey,
> 
> we just added a new FAQ entry for upcoming CP 3.2 release that
> answers your question. I just c&p it here. More concrete answer
> below.
> 
>>>> If you get an exception similar to the one shown below, there
>>>> are multiple possible causes:
>>>> 
>>>> Exception in thread "StreamThread-1" 
>>>> java.lang.IllegalArgumentException: Invalid timestamp -1 at 
>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRec
ord
>
>>>> 
.java:60)
>>>> 
>>>> This error means that the timestamp extractor of your Kafka 
>>>> Streams application failed to extract a valid timestamp from
>>>> a record. Typically, this points to a problem with the record
>>>> (e.g., the record does not contain a timestamp at all), but
>>>> it could also indicate a problem or bug in the timestamp
>>>> extractor used by the application.
>>>> 
>>>> When does a record not contain a valid timestamp:
>>>> 
>>>> If you are using the default
>>>> ConsumerRecordTimestampExtractor, it is most likely that your
>>>> records do not carry an embedded timestamp (embedded record
>>>> timestamps got introduced in Kafka's message format in Kafka
>>>> 0.10). This might happen, if you consume a topic that is
>>>> written by old Kafka producer clients (ie, version 0.9 or
>>>> earlier) or third party producer clients. A common situation 
>>>> where this may happen is after upgrading your Kafka cluster
>>>> from 0.9 to 0.10, where all the data that was generated with
>>>> 0.9 is not compatible with the 0.10 message format. If you
>>>> are using a custom timestamp extractor, make sure that your
>>>> extractor is robust to missing timestamps in your records.
>>>> For example, you can return a default or estimated timestamp
>>>> if you cannot extract a valid timestamp (maybe the timstamp
>>>> field in your data is just missing). You can also switch to
>>>> processing time semantics via WallclockTimestampExtractor;
>>>> whether such a fallback is an appropriate response to this
>>>> situation depends on your use case. However, as a first step
>>>> you should identify and fix the root cause for why such
>>>> problematic records were written to Kafka in the first place.
>>>> In a second step you may consider applying workarounds (as 
>>>> described above) when dealing with such records (for example,
>>>> if you need to process those records after all). Another
>>>> option is to regenerate the records with correct timestamps
>>>> and write them to a new Kafka topic.
>>>> 
>>>> When the timestamp extractor causes the problem:
>>>> 
>>>> In this situation you should debug and fix the erroneous
>>>> extractor. If the extractor is built into Kafka, please
>>>> report the bug to the Kafka developer mailing list at
>>>> dev@kafka.apache.org (see instructions
>>>> http://kafka.apache.org/contact); in the meantime, you may
>>>> write a custom timestamp extractor that fixes the problem
>>>> and configure your application to use that extractor for the
>>>> time being.
> 
> 
> To address you questions more concretely:
> 
> 1. Yes an no: Yes, for any new data you write to you topic. No,
> for any already written data that does not have a valid timestamp
> set 2. Default is creating time 3. Config parameter
> "message.timestamp.type") It's a broker side per topic setting 
> (however, be aware that Java KafkaProducer does verify the 
> timestamp locally before sending the message to the broker, thus on
> -1 there will be the client side exception you did observe( 4. I
> assume that you do consumer different topic with different TS 
> fields in you records.
> 
> Also have a look at: 
> http://docs.confluent.io/current/streams/concepts.html#time
> 
> 
> 
> -Matthias
> 
> 
> On 10/28/16 5:42 AM, Debasish Ghosh wrote:
>>>> I am actually using 0.10.0 and NOT 0.10.1 as I mentioned in
>>>> the last mail. And I am using Kafka within a DC/OS cluster
>>>> under AWS.
>>>> 
>>>> The version that I mentioned works ok is on my local machine
>>>> using a local Kafka installation. And it works for both
>>>> single broker and multi broker scenario.
>>>> 
>>>> Thanks.
>>>> 
>>>> On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh 
>>>> <gh...@gmail.com> wrote:
>>>> 
>>>>> Hello -
>>>>> 
>>>>> I am a beginner in Kafka .. with my first Kafka streams 
>>>>> application ..
>>>>> 
>>>>> I have a streams application that reads from a topic, does
>>>>> some transformation on the data and writes to another
>>>>> topic. The record that I manipulate is a CSV record.
>>>>> 
>>>>> It runs fine when I run it on a local Kafka instance.
>>>>> 
>>>>> However when I run it on an AWS cluster, I get the
>>>>> following exception when I try to produce the transformed
>>>>> record into the target topic.
>>>>> 
>>>>> Exception in thread "StreamThread-1" 
>>>>> java.lang.IllegalArgumentException: Invalid timestamp -1
>>>>> at org.apache.kafka.clients.producer.ProducerRecord.<init> 
>>>>> (ProducerRecord.java:60) at 
>>>>> org.apache.kafka.streams.processor.internals.SinkNode. 
>>>>> process(SinkNode.java:72) at 
>>>>> org.apache.kafka.streams.processor.internals. 
>>>>> StreamTask.forward(StreamTask.java:338) at 
>>>>> org.apache.kafka.streams.processor.internals. 
>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>> at 
>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
>>>>>
>>>>> 
KStreamMapProcessor.process(KStreamMapValues.java:42) at
>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
(
>>>>>
>>>>>
>
>>>>> 
ProcessorNode.java:68)
>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>> StreamTask.forward(StreamTask.java:338) at 
>>>>> org.apache.kafka.streams.processor.internals. 
>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>> at 
>>>>> org.apache.kafka.streams.kstream.internals.KStreamPassThrough$
>>>>>
>>>>> 
KStreamPassThroughProcessor.process(KStreamPassThrough.java:34)
>>>>> at 
>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
(
>>>>>
>>>>>
>
>>>>> 
ProcessorNode.java:68)
>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>> StreamTask.forward(StreamTask.java:351) at 
>>>>> org.apache.kafka.streams.processor.internals. 
>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:192)
>>>>> at 
>>>>> org.apache.kafka.streams.kstream.internals.KStreamBranch$ 
>>>>> KStreamBranchProcessor.process(KStreamBranch.java:46) at 
>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
(
>>>>>
>>>>>
>
>>>>> 
ProcessorNode.java:68)
>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>> StreamTask.forward(StreamTask.java:338) at 
>>>>> org.apache.kafka.streams.processor.internals. 
>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>> at 
>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
>>>>>
>>>>> 
KStreamMapProcessor.process(KStreamMapValues.java:42) at
>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
(
>>>>>
>>>>>
>
>>>>> 
ProcessorNode.java:68)
>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>> StreamTask.forward(StreamTask.java:338) at 
>>>>> org.apache.kafka.streams.processor.internals. 
>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>> SourceNode.process(SourceNode.java:64) at 
>>>>> org.apache.kafka.streams.processor.internals. 
>>>>> StreamTask.process(StreamTask.java:174) at 
>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>>>>
>>>>>
>
>>>>> 
StreamThread.java:320)
>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>> StreamThread.run(StreamThread.java:218)
>>>>> 
>>>>> Looks like the timestamp passed to the ProducerRecord is
>>>>> -1, though I am not passing any timestamp explicitly. I am
>>>>> not sure why this happens. But I see that the Javadoc for
>>>>> ProducerRecord says the following ..
>>>>> 
>>>>> The record also has an associated timestamp. If the user
>>>>> did not provide a
>>>>>> timestamp, the producer will stamp the record with its
>>>>>> current time. The timestamp eventually used by Kafka
>>>>>> depends on the timestamp type configured for the topic.
>>>>>> If the topic is configured to use CreateTime, the
>>>>>> timestamp in the producer record will be used by the
>>>>>> broker. If the topic is configured to use LogAppendTime,
>>>>>> the timestamp in the producer record will be overwritten
>>>>>> by the broker with the broker local time when it appends
>>>>>> the message to its log. In either of the cases above, the
>>>>>> timestamp that has actually been used will be returned
>>>>>> to user in RecordMetadata
>>>>> 
>>>>> 
>>>>> 1. Will this problem be solved if I configure the topic
>>>>> with LogAppendTime or CreateTime explicitly ? 2. What is
>>>>> the default setting of this property in a newly created
>>>>> topic ? 3. How do I change it (what is the name of the
>>>>> property to be set) ? 4. Any idea why I face this problem
>>>>> in the cluster mode but not in the local mode ?
>>>>> 
>>>>> BTW I am using 0.10.1.
>>>>> 
>>>>> Any help / pointer will be appreciated ?
>>>>> 
>>>>> regards.
>>>>> 
>>>>> -- Debasish Ghosh http://manning.com/ghosh2 
>>>>> http://manning.com/ghosh
>>>>> 
>>>>> Twttr: @debasishg Blog: http://debasishg.blogspot.com
>>>>> Code: http://github.com/debasishg
>>>>> 
>>>> 
>>>> 
>>>> 
>> 
> 
> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYFETkAAoJECnhiMLycopPY6EQAJhI9K5FnuAZ68ix/MDOb2d5
mkewtvZSN2wgXBiqhouTYvqrU+//gPfIw6aONN8DW6Z5ufbzxRjclA1y9QAXaeQw
v9tuYxxtPwkw2oMuR3T8o4WI6dEsJao//YAE6fGqx4giWzHvbGZQBqwbidGVFkHE
K0BC+Rkuod1wJ3t8/2u/JG8YQtSV5b1t/kp6wDGS/yNzxHRhCkKr4nWdHD9y+CKJ
ej9+CD0wSqTHB5QpMLbbgd1CgX/QeJ8jweqeD7lWrgOWoDZxqnitxHCyQ484kdyw
ETIJWT02cAj760iZRN8l892rVtZNhpCgdY9uY4l8z9px93qS8P9kjlGDrm+jaPhu
nIBZnM2OoWEeg5rp52fQSIg37YIuVktrkvwj5C2kH1eo5iAnmJps/cZonP5BVEDR
T+yyYNPER6ltgpZrRugJggT8TerylA12K5ro3JuoDPxeRQIbAN8JGgqUZofiOzhq
DY56XVYlYAKcgj+8MqbygI0Q8pORnm2YNmtZgMYWThNuz6j6Nf3MmU5YIAc0g9ge
5asNzW2nPxY1N6INGFn5ET4qq8AX9/75LJ+i6tAr8Ddqy0MPt8TFHwE0bCgdPCNo
Ef99ln2esbPnLujz+PeNuAh1RBab0Hets16jYF2dBwmKElgRCFH1u2gOfMG3cpjl
WCwK1XhEpIDdB0+pxzyD
=NSWH
-----END PGP SIGNATURE-----

Re: Problem with timestamp in Producer

Posted by Debasish Ghosh <gh...@gmail.com>.
Hello Mathias -

Thanks a lot for the response. I think what may be happening is a version
mismatch between the development & deployment versions of Kafka. The Kafka
streams application that I developed uses 0.10.0 based libraries. And my
local environment contains a server installation of the same version. Hence
it works ok in my local environment.

But the DC/OS 1.8 deploys 0.9.0 of Kafka when I install the service through
DC/OS cli. And I use this version to load records into the input topic. And
try to consume using the deployed streams application which I developed
using 0.10.0. Hence the producer did not put the timestamp while the
consumer expects to have one.

I need to check if 0.10.x is available for DC/OS ..

Thanks again for your suggestions.


On Sat, Oct 29, 2016 at 3:23 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> Hey,
>
> we just added a new FAQ entry for upcoming CP 3.2 release that answers
> your question. I just c&p it here. More concrete answer below.
>
> > If you get an exception similar to the one shown below, there are
> > multiple possible causes:
> >
> > Exception in thread "StreamThread-1"
> > java.lang.IllegalArgumentException: Invalid timestamp -1 at
> > org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord
> .java:60)
> >
> >  This error means that the timestamp extractor of your Kafka
> > Streams application failed to extract a valid timestamp from a
> > record. Typically, this points to a problem with the record (e.g.,
> > the record does not contain a timestamp at all), but it could also
> > indicate a problem or bug in the timestamp extractor used by the
> > application.
> >
> > When does a record not contain a valid timestamp:
> >
> > If you are using the default ConsumerRecordTimestampExtractor, it
> > is most likely that your records do not carry an embedded
> > timestamp (embedded record timestamps got introduced in Kafka's
> > message format in Kafka 0.10). This might happen, if you consume a
> > topic that is written by old Kafka producer clients (ie, version
> > 0.9 or earlier) or third party producer clients. A common situation
> > where this may happen is after upgrading your Kafka cluster from
> > 0.9 to 0.10, where all the data that was generated with 0.9 is not
> > compatible with the 0.10 message format. If you are using a custom
> > timestamp extractor, make sure that your extractor is robust to
> > missing timestamps in your records. For example, you can return a
> > default or estimated timestamp if you cannot extract a valid
> > timestamp (maybe the timstamp field in your data is just missing).
> > You can also switch to processing time semantics via
> > WallclockTimestampExtractor; whether such a fallback is an
> > appropriate response to this situation depends on your use case.
> > However, as a first step you should identify and fix the root cause
> > for why such problematic records were written to Kafka in the first
> > place. In a second step you may consider applying workarounds (as
> > described above) when dealing with such records (for example, if
> > you need to process those records after all). Another option is to
> > regenerate the records with correct timestamps and write them to a
> > new Kafka topic.
> >
> > When the timestamp extractor causes the problem:
> >
> > In this situation you should debug and fix the erroneous extractor.
> > If the extractor is built into Kafka, please report the bug to the
> > Kafka developer mailing list at dev@kafka.apache.org (see
> > instructions http://kafka.apache.org/contact); in the meantime, you
> > may write a custom timestamp extractor that fixes the problem and
> > configure your application to use that extractor for the time
> > being.
>
>
> To address you questions more concretely:
>
>   1. Yes an no: Yes, for any new data you write to you topic. No, for
> any already written data that does not have a valid timestamp set
>   2. Default is creating time
>   3. Config parameter "message.timestamp.type")
>      It's a broker side per topic setting
>      (however, be aware that Java KafkaProducer does verify the
> timestamp locally before sending the message to the broker, thus on -1
> there will be the client side exception you did observe(
>   4. I assume that you do consumer different topic with different TS
> fields in you records.
>
> Also have a look at:
> http://docs.confluent.io/current/streams/concepts.html#time
>
>
>
> - -Matthias
>
>
> On 10/28/16 5:42 AM, Debasish Ghosh wrote:
> > I am actually using 0.10.0 and NOT 0.10.1 as I mentioned in the
> > last mail. And I am using Kafka within a DC/OS cluster under AWS.
> >
> > The version that I mentioned works ok is on my local machine using
> > a local Kafka installation. And it works for both single broker and
> > multi broker scenario.
> >
> > Thanks.
> >
> > On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh
> > <gh...@gmail.com> wrote:
> >
> >> Hello -
> >>
> >> I am a beginner in Kafka .. with my first Kafka streams
> >> application ..
> >>
> >> I have a streams application that reads from a topic, does some
> >> transformation on the data and writes to another topic. The
> >> record that I manipulate is a CSV record.
> >>
> >> It runs fine when I run it on a local Kafka instance.
> >>
> >> However when I run it on an AWS cluster, I get the following
> >> exception when I try to produce the transformed record into the
> >> target topic.
> >>
> >> Exception in thread "StreamThread-1"
> >> java.lang.IllegalArgumentException: Invalid timestamp -1 at
> >> org.apache.kafka.clients.producer.ProducerRecord.<init>
> >> (ProducerRecord.java:60) at
> >> org.apache.kafka.streams.processor.internals.SinkNode.
> >> process(SinkNode.java:72) at
> >> org.apache.kafka.streams.processor.internals.
> >> StreamTask.forward(StreamTask.java:338) at
> >> org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at
> >> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
> >> KStreamMapProcessor.process(KStreamMapValues.java:42) at
> >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> >>
> >>
> ProcessorNode.java:68)
> >> at org.apache.kafka.streams.processor.internals.
> >> StreamTask.forward(StreamTask.java:338) at
> >> org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at
> >> org.apache.kafka.streams.kstream.internals.KStreamPassThrough$
> >> KStreamPassThroughProcessor.process(KStreamPassThrough.java:34)
> >> at
> >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> >>
> >>
> ProcessorNode.java:68)
> >> at org.apache.kafka.streams.processor.internals.
> >> StreamTask.forward(StreamTask.java:351) at
> >> org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:192) at
> >> org.apache.kafka.streams.kstream.internals.KStreamBranch$
> >> KStreamBranchProcessor.process(KStreamBranch.java:46) at
> >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> >>
> >>
> ProcessorNode.java:68)
> >> at org.apache.kafka.streams.processor.internals.
> >> StreamTask.forward(StreamTask.java:338) at
> >> org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at
> >> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
> >> KStreamMapProcessor.process(KStreamMapValues.java:42) at
> >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> >>
> >>
> ProcessorNode.java:68)
> >> at org.apache.kafka.streams.processor.internals.
> >> StreamTask.forward(StreamTask.java:338) at
> >> org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at
> >> org.apache.kafka.streams.processor.internals.
> >> SourceNode.process(SourceNode.java:64) at
> >> org.apache.kafka.streams.processor.internals.
> >> StreamTask.process(StreamTask.java:174) at
> >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >>
> >>
> StreamThread.java:320)
> >> at org.apache.kafka.streams.processor.internals.
> >> StreamThread.run(StreamThread.java:218)
> >>
> >> Looks like the timestamp passed to the ProducerRecord is -1,
> >> though I am not passing any timestamp explicitly. I am not sure
> >> why this happens. But I see that the Javadoc for ProducerRecord
> >> says the following ..
> >>
> >> The record also has an associated timestamp. If the user did not
> >> provide a
> >>> timestamp, the producer will stamp the record with its current
> >>> time. The timestamp eventually used by Kafka depends on the
> >>> timestamp type configured for the topic. If the topic is
> >>> configured to use CreateTime, the timestamp in the producer
> >>> record will be used by the broker. If the topic is configured
> >>> to use LogAppendTime, the timestamp in the producer record will
> >>> be overwritten by the broker with the broker local time when it
> >>> appends the message to its log. In either of the cases above,
> >>> the timestamp that has actually been used will be returned to
> >>> user in RecordMetadata
> >>
> >>
> >> 1. Will this problem be solved if I configure the topic with
> >> LogAppendTime or CreateTime explicitly ? 2. What is the default
> >> setting of this property in a newly created topic ? 3. How do I
> >> change it (what is the name of the property to be set) ? 4. Any
> >> idea why I face this problem in the cluster mode but not in the
> >> local mode ?
> >>
> >> BTW I am using 0.10.1.
> >>
> >> Any help / pointer will be appreciated ?
> >>
> >> regards.
> >>
> >> -- Debasish Ghosh http://manning.com/ghosh2
> >> http://manning.com/ghosh
> >>
> >> Twttr: @debasishg Blog: http://debasishg.blogspot.com Code:
> >> http://github.com/debasishg
> >>
> >
> >
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYE8jGAAoJECnhiMLycopPZmYQAIJ+wCZb8l1dJNDxbpWvnopz
> pRB/YKoXLUA/BxQ/z8PZR8gd/0evQ92Kwt/7ZuS1no7o1CNixptZ81ycoxN7pj4q
> QuVN/D9QDJvCDUdhDHFT50dwvHehgoD7oj2MLOlwCH1CWjd8REyRP+8gQDJ/6jko
> frQUXK5hpT3QDl2F3kvS0JL40SDSOxAIxxFL0EH4midCXHhn3KR/XmdFSRm8Gmnp
> jZ6+FpTEO3ntav0uhaA5zPwoMAIyc/Jcx6rFrzGBcnPPE0LkYCvPhmmN44OCe7N6
> mR/y13IAVUfaOmQqdXr8lkLVWdSxFKX37wASie43jripHygFxOAYn5NtHqan4OvK
> EVjQDL87fZ+938ML97CGtiTdhLX65WBvd9IWgScI6vHZ84AgaeWvLLdZeHNWDboF
> NJPWqk9VYFObre8pGHrFEha7RPz0fBqWP8uTc1pEZDIll44NYklPmC7aA0YnTlkU
> rkpkoAyhoGi0ohvfxpDg6cG36M8KU5YbbeT3Tq4MQwFx9Fwn9wf/Kpw0KLewEMil
> NqNWYDwxBPlEDkRxjGjMn9S9oQPqmPE3orQL6Va0OGJKoj1ezAruSP+IG+NibKpG
> SFbIdzg5L9fHINaGudd03Oir4QY76hTaPnErLOKeGmoLVyS7dij6Nw/ol1DV3pnl
> 56kTjPfgC4o9k0Vh0Ym4
> =SlO6
> -----END PGP SIGNATURE-----
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Re: Problem with timestamp in Producer

Posted by "Matthias J. Sax" <ma...@confluent.io>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Hey,

we just added a new FAQ entry for upcoming CP 3.2 release that answers
your question. I just c&p it here. More concrete answer below.

> If you get an exception similar to the one shown below, there are
> multiple possible causes:
> 
> Exception in thread "StreamThread-1"
> java.lang.IllegalArgumentException: Invalid timestamp -1 at
> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord
.java:60)
>
>  This error means that the timestamp extractor of your Kafka
> Streams application failed to extract a valid timestamp from a
> record. Typically, this points to a problem with the record (e.g.,
> the record does not contain a timestamp at all), but it could also
> indicate a problem or bug in the timestamp extractor used by the
> application.
> 
> When does a record not contain a valid timestamp:
> 
> If you are using the default ConsumerRecordTimestampExtractor, it
> is most likely that your records do not carry an embedded
> timestamp (embedded record timestamps got introduced in Kafka's
> message format in Kafka 0.10). This might happen, if you consume a
> topic that is written by old Kafka producer clients (ie, version
> 0.9 or earlier) or third party producer clients. A common situation
> where this may happen is after upgrading your Kafka cluster from
> 0.9 to 0.10, where all the data that was generated with 0.9 is not
> compatible with the 0.10 message format. If you are using a custom
> timestamp extractor, make sure that your extractor is robust to
> missing timestamps in your records. For example, you can return a
> default or estimated timestamp if you cannot extract a valid
> timestamp (maybe the timstamp field in your data is just missing). 
> You can also switch to processing time semantics via
> WallclockTimestampExtractor; whether such a fallback is an
> appropriate response to this situation depends on your use case. 
> However, as a first step you should identify and fix the root cause
> for why such problematic records were written to Kafka in the first
> place. In a second step you may consider applying workarounds (as
> described above) when dealing with such records (for example, if
> you need to process those records after all). Another option is to
> regenerate the records with correct timestamps and write them to a
> new Kafka topic.
> 
> When the timestamp extractor causes the problem:
> 
> In this situation you should debug and fix the erroneous extractor.
> If the extractor is built into Kafka, please report the bug to the
> Kafka developer mailing list at dev@kafka.apache.org (see
> instructions http://kafka.apache.org/contact); in the meantime, you
> may write a custom timestamp extractor that fixes the problem and
> configure your application to use that extractor for the time
> being.


To address you questions more concretely:

  1. Yes an no: Yes, for any new data you write to you topic. No, for
any already written data that does not have a valid timestamp set
  2. Default is creating time
  3. Config parameter "message.timestamp.type")
     It's a broker side per topic setting
     (however, be aware that Java KafkaProducer does verify the
timestamp locally before sending the message to the broker, thus on -1
there will be the client side exception you did observe(
  4. I assume that you do consumer different topic with different TS
fields in you records.

Also have a look at:
http://docs.confluent.io/current/streams/concepts.html#time



- -Matthias


On 10/28/16 5:42 AM, Debasish Ghosh wrote:
> I am actually using 0.10.0 and NOT 0.10.1 as I mentioned in the
> last mail. And I am using Kafka within a DC/OS cluster under AWS.
> 
> The version that I mentioned works ok is on my local machine using
> a local Kafka installation. And it works for both single broker and
> multi broker scenario.
> 
> Thanks.
> 
> On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh
> <gh...@gmail.com> wrote:
> 
>> Hello -
>> 
>> I am a beginner in Kafka .. with my first Kafka streams
>> application ..
>> 
>> I have a streams application that reads from a topic, does some 
>> transformation on the data and writes to another topic. The
>> record that I manipulate is a CSV record.
>> 
>> It runs fine when I run it on a local Kafka instance.
>> 
>> However when I run it on an AWS cluster, I get the following
>> exception when I try to produce the transformed record into the
>> target topic.
>> 
>> Exception in thread "StreamThread-1"
>> java.lang.IllegalArgumentException: Invalid timestamp -1 at
>> org.apache.kafka.clients.producer.ProducerRecord.<init> 
>> (ProducerRecord.java:60) at
>> org.apache.kafka.streams.processor.internals.SinkNode. 
>> process(SinkNode.java:72) at
>> org.apache.kafka.streams.processor.internals. 
>> StreamTask.forward(StreamTask.java:338) at
>> org.apache.kafka.streams.processor.internals. 
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at
>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$ 
>> KStreamMapProcessor.process(KStreamMapValues.java:42) at
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>>
>> 
ProcessorNode.java:68)
>> at org.apache.kafka.streams.processor.internals. 
>> StreamTask.forward(StreamTask.java:338) at
>> org.apache.kafka.streams.processor.internals. 
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at
>> org.apache.kafka.streams.kstream.internals.KStreamPassThrough$ 
>> KStreamPassThroughProcessor.process(KStreamPassThrough.java:34) 
>> at
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>>
>> 
ProcessorNode.java:68)
>> at org.apache.kafka.streams.processor.internals. 
>> StreamTask.forward(StreamTask.java:351) at
>> org.apache.kafka.streams.processor.internals. 
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:192) at
>> org.apache.kafka.streams.kstream.internals.KStreamBranch$ 
>> KStreamBranchProcessor.process(KStreamBranch.java:46) at
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>>
>> 
ProcessorNode.java:68)
>> at org.apache.kafka.streams.processor.internals. 
>> StreamTask.forward(StreamTask.java:338) at
>> org.apache.kafka.streams.processor.internals. 
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at
>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$ 
>> KStreamMapProcessor.process(KStreamMapValues.java:42) at
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>>
>> 
ProcessorNode.java:68)
>> at org.apache.kafka.streams.processor.internals. 
>> StreamTask.forward(StreamTask.java:338) at
>> org.apache.kafka.streams.processor.internals. 
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at
>> org.apache.kafka.streams.processor.internals. 
>> SourceNode.process(SourceNode.java:64) at
>> org.apache.kafka.streams.processor.internals. 
>> StreamTask.process(StreamTask.java:174) at
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>
>> 
StreamThread.java:320)
>> at org.apache.kafka.streams.processor.internals. 
>> StreamThread.run(StreamThread.java:218)
>> 
>> Looks like the timestamp passed to the ProducerRecord is -1,
>> though I am not passing any timestamp explicitly. I am not sure
>> why this happens. But I see that the Javadoc for ProducerRecord
>> says the following ..
>> 
>> The record also has an associated timestamp. If the user did not
>> provide a
>>> timestamp, the producer will stamp the record with its current
>>> time. The timestamp eventually used by Kafka depends on the
>>> timestamp type configured for the topic. If the topic is
>>> configured to use CreateTime, the timestamp in the producer
>>> record will be used by the broker. If the topic is configured
>>> to use LogAppendTime, the timestamp in the producer record will
>>> be overwritten by the broker with the broker local time when it
>>> appends the message to its log. In either of the cases above,
>>> the timestamp that has actually been used will be returned to
>>> user in RecordMetadata
>> 
>> 
>> 1. Will this problem be solved if I configure the topic with 
>> LogAppendTime or CreateTime explicitly ? 2. What is the default
>> setting of this property in a newly created topic ? 3. How do I
>> change it (what is the name of the property to be set) ? 4. Any
>> idea why I face this problem in the cluster mode but not in the 
>> local mode ?
>> 
>> BTW I am using 0.10.1.
>> 
>> Any help / pointer will be appreciated ?
>> 
>> regards.
>> 
>> -- Debasish Ghosh http://manning.com/ghosh2 
>> http://manning.com/ghosh
>> 
>> Twttr: @debasishg Blog: http://debasishg.blogspot.com Code:
>> http://github.com/debasishg
>> 
> 
> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYE8jGAAoJECnhiMLycopPZmYQAIJ+wCZb8l1dJNDxbpWvnopz
pRB/YKoXLUA/BxQ/z8PZR8gd/0evQ92Kwt/7ZuS1no7o1CNixptZ81ycoxN7pj4q
QuVN/D9QDJvCDUdhDHFT50dwvHehgoD7oj2MLOlwCH1CWjd8REyRP+8gQDJ/6jko
frQUXK5hpT3QDl2F3kvS0JL40SDSOxAIxxFL0EH4midCXHhn3KR/XmdFSRm8Gmnp
jZ6+FpTEO3ntav0uhaA5zPwoMAIyc/Jcx6rFrzGBcnPPE0LkYCvPhmmN44OCe7N6
mR/y13IAVUfaOmQqdXr8lkLVWdSxFKX37wASie43jripHygFxOAYn5NtHqan4OvK
EVjQDL87fZ+938ML97CGtiTdhLX65WBvd9IWgScI6vHZ84AgaeWvLLdZeHNWDboF
NJPWqk9VYFObre8pGHrFEha7RPz0fBqWP8uTc1pEZDIll44NYklPmC7aA0YnTlkU
rkpkoAyhoGi0ohvfxpDg6cG36M8KU5YbbeT3Tq4MQwFx9Fwn9wf/Kpw0KLewEMil
NqNWYDwxBPlEDkRxjGjMn9S9oQPqmPE3orQL6Va0OGJKoj1ezAruSP+IG+NibKpG
SFbIdzg5L9fHINaGudd03Oir4QY76hTaPnErLOKeGmoLVyS7dij6Nw/ol1DV3pnl
56kTjPfgC4o9k0Vh0Ym4
=SlO6
-----END PGP SIGNATURE-----

Re: Problem with timestamp in Producer

Posted by Debasish Ghosh <gh...@gmail.com>.
I am actually using 0.10.0 and NOT 0.10.1 as I mentioned in the last mail.
And I am using Kafka within a DC/OS cluster under AWS.

The version that I mentioned works ok is on my local machine using a local
Kafka installation. And it works for both single broker and multi broker
scenario.

Thanks.

On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh <gh...@gmail.com>
wrote:

> Hello -
>
> I am a beginner in Kafka .. with my first Kafka streams application ..
>
> I have a streams application that reads from a topic, does some
> transformation on the data and writes to another topic. The record that I
> manipulate is a CSV record.
>
> It runs fine when I run it on a local Kafka instance.
>
> However when I run it on an AWS cluster, I get the following exception
> when I try to produce the transformed record into the target topic.
>
> Exception in thread "StreamThread-1" java.lang.IllegalArgumentException:
> Invalid timestamp -1
> at org.apache.kafka.clients.producer.ProducerRecord.<init>
> (ProducerRecord.java:60)
> at org.apache.kafka.streams.processor.internals.SinkNode.
> process(SinkNode.java:72)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.forward(StreamTask.java:338)
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> at org.apache.kafka.streams.kstream.internals.KStreamMapValues$
> KStreamMapProcessor.process(KStreamMapValues.java:42)
> at org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:68)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.forward(StreamTask.java:338)
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> at org.apache.kafka.streams.kstream.internals.KStreamPassThrough$
> KStreamPassThroughProcessor.process(KStreamPassThrough.java:34)
> at org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:68)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.forward(StreamTask.java:351)
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:192)
> at org.apache.kafka.streams.kstream.internals.KStreamBranch$
> KStreamBranchProcessor.process(KStreamBranch.java:46)
> at org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:68)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.forward(StreamTask.java:338)
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> at org.apache.kafka.streams.kstream.internals.KStreamMapValues$
> KStreamMapProcessor.process(KStreamMapValues.java:42)
> at org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:68)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.forward(StreamTask.java:338)
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> at org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:64)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:174)
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:320)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:218)
>
> Looks like the timestamp passed to the ProducerRecord is -1, though I am
> not passing any timestamp explicitly. I am not sure why this happens. But I
> see that the Javadoc for ProducerRecord says the following ..
>
> The record also has an associated timestamp. If the user did not provide a
>> timestamp, the producer will stamp the record with its current time. The
>> timestamp eventually used by Kafka depends on the timestamp type configured
>> for the topic.
>> If the topic is configured to use CreateTime, the timestamp in the
>> producer record will be used by the broker.
>> If the topic is configured to use LogAppendTime, the timestamp in the
>> producer record will be overwritten by the broker with the broker local
>> time when it appends the message to its log.
>> In either of the cases above, the timestamp that has actually been used
>> will be returned to user in RecordMetadata
>
>
>    1. Will this problem be solved if I configure the topic with
>    LogAppendTime or CreateTime explicitly ?
>    2. What is the default setting of this property in a newly created
>    topic ?
>    3. How do I change it (what is the name of the property to be set) ?
>    4. Any idea why I face this problem in the cluster mode but not in the
>    local mode ?
>
> BTW I am using 0.10.1.
>
> Any help / pointer will be appreciated ?
>
> regards.
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg