You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Craig Ching <cr...@gmail.com> on 2018/07/23 14:10:58 UTC

Debugging message timestamps in Sarama

Hi!

I'm working on debugging a problem with how message timestamps are handled
in the sarama client.  In some cases, the sarama client won't associate a
timestamp with a message while the kafka console consumer does.  I've found
the documentation on the message format here:

https://kafka.apache.org/documentation/#messageformat

But the information there is very sparse.  For instance, what are
'firstTimestamp' and 'maxTimestamp'?  It seems that when I'm debugging
sarama, firstTimestamp is set to -1 and maxTimestamp appears to be the
timestamp I want.  Is there some state about the message that I need to
understand in order to have maxTimestamp be used?  Any further
documentation or guidance on this would be very helpful!

On another note, I am trying to debug this through the scala/java console
consumer, but I'm having a hard time getting IntelliJ setup.  Is there
anything special or documentation I need to set this up for debugging?

Re: Debugging message timestamps in Sarama

Posted by Dmitriy Vsekhvalnov <dv...@gmail.com>.
Interesting. Well, i'd say open issue in Sarama's github, probably you'll
find answers/ideas faster.

On Wed, Jul 25, 2018 at 7:51 PM Craig Ching <cr...@gmail.com> wrote:

> This didn’t fix my problem unfortunately.  Both time stamps are 0.
>
>
> > On Jul 24, 2018, at 15:22, Craig Ching <cr...@gmail.com> wrote:
> >
> > Hey, thanks for that Dmitriy!  I'll have a look.
> >
> >> On Tue, Jul 24, 2018 at 11:18 AM Dmitriy Vsekhvalnov <
> dvsekhvalnov@gmail.com> wrote:
> >> Not really associated with Sarama.
> >>
> >> But your issues sounds pretty much same i faced some time ago and fixed,
> >> here it is: https://github.com/Shopify/sarama/issues/885
> >>
> >> Try using msg.BlockTimestamp instead of msg.Timestamp and see if it
> helps.
> >>
> >> On Tue, Jul 24, 2018 at 3:26 AM Craig Ching <cr...@gmail.com>
> wrote:
> >>
> >> > Hi Dmitry,
> >> >
> >> > Are you associated with the Sarama project?  If so, understand that
> part of
> >> > what I want is to learn about Sarama and the Kafka message format ;)
> >> >
> >> > The problem I'm having is that if I turn on:
> >> >
> >> > log.message.timestamp.type=LogAppendTime
> >> >
> >> > in the broker, then produce on topic1 with console producer, I will
> see
> >> > timestamps in the sarama client.  If I produce on topic2 with telegraf
> >> > (incidentally, I think telegraf is a sarama producer), then I don't
> see
> >> > timestamps in the sarama client.  In both cases, if I consume using
> the
> >> > console consumer (with --property print.timestamp=true) I *do* see
> >> > timestamps.
> >> >
> >> > I'm happy to debug this issue myself and submit a PR to sarama, but I
> am
> >> > missing some fundamentals of how to decode the kafka message format
> and
> >> > would really like some pointers.
> >> >
> >> > Cheers,
> >> > Craig
> >> >
> >> > P.S.  Here is the sarama code I'm using to test:
> >> >
> >> > package main
> >> >
> >> > import (
> >> > "fmt"
> >> > "log"
> >> > "os"
> >> > "os/signal"
> >> > "time"
> >> >
> >> > "github.com/Shopify/sarama"
> >> > )
> >> >
> >> > func main() {
> >> >
> >> > // Initialize Sarama logging
> >> > sarama.Logger = log.New(os.Stdout, "[Sarama] ",
> >> > log.Ldate|log.Lmicroseconds|log.Lshortfile)
> >> >
> >> > signals := make(chan os.Signal, 1)
> >> > signal.Notify(signals, os.Interrupt)
> >> >
> >> > config := sarama.NewConfig()
> >> > config.Consumer.Return.Errors = true
> >> > config.ClientID = "consumer-test"
> >> > config.Metadata.RefreshFrequency = time.Duration(5) * time.Minute
> >> > config.Metadata.Full = true
> >> > // config.Version = sarama.V0_10_0_0
> >> > config.Version = sarama.V1_1_0_0
> >> > // config.Version = sarama.V0_10_2_1
> >> > config.Consumer.Offsets.Initial = sarama.OffsetOldest
> >> >
> >> > brokers := []string{"localhost:9092"}
> >> > // brokers :=
> >> >
> >> >
> []string{"measurement-kafka-broker.service.tgt-pe-prod-ttc.consul.c-prod.ost.cloud.target.internal:9092"}
> >> >
> >> > client, err := sarama.NewConsumer(brokers, config)
> >> > if err != nil {
> >> > panic(err)
> >> > }
> >> >
> >> > // topic := "topic1"
> >> > topic := "topic2"
> >> > // topic := "metric-influx-measurement"
> >> > // How to decide partition, is it fixed value...?
> >> > consumer, err := client.ConsumePartition(topic, 0,
> sarama.OffsetOldest)
> >> > if err != nil {
> >> > panic(err)
> >> > }
> >> >
> >> > defer func() {
> >> > if err := client.Close(); err != nil {
> >> > panic(err)
> >> > }
> >> > }()
> >> >
> >> > // Count how many message processed
> >> > msgCount := 0
> >> >
> >> > go func() {
> >> > for {
> >> > select {
> >> > case err := <-consumer.Errors():
> >> > fmt.Println(err)
> >> > case msg := <-consumer.Messages():
> >> > msgCount++
> >> > fmt.Println(msg.Timestamp)
> >> > fmt.Println("Received messages", string(msg.Key), string(msg.Value))
> >> > case <-signals:
> >> > fmt.Println("Interrupt is detected")
> >> > break
> >> > }
> >> > }
> >> > }()
> >> > <-signals
> >> > }
> >> >
> >> >
> >> > On Mon, Jul 23, 2018 at 10:43 AM Dmitriy Vsekhvalnov <
> >> > dvsekhvalnov@gmail.com>
> >> > wrote:
> >> >
> >> > > Hey Craig,
> >> > >
> >> > > what exact problem you have with Sarama client?
> >> > >
> >> > > On Mon, Jul 23, 2018 at 5:11 PM Craig Ching <cr...@gmail.com>
> >> > wrote:
> >> > >
> >> > > > Hi!
> >> > > >
> >> > > > I'm working on debugging a problem with how message timestamps are
> >> > > handled
> >> > > > in the sarama client.  In some cases, the sarama client won't
> >> > associate a
> >> > > > timestamp with a message while the kafka console consumer does.
> I've
> >> > > found
> >> > > > the documentation on the message format here:
> >> > > >
> >> > > > https://kafka.apache.org/documentation/#messageformat
> >> > > >
> >> > > > But the information there is very sparse.  For instance, what are
> >> > > > 'firstTimestamp' and 'maxTimestamp'?  It seems that when I'm
> debugging
> >> > > > sarama, firstTimestamp is set to -1 and maxTimestamp appears to
> be the
> >> > > > timestamp I want.  Is there some state about the message that I
> need to
> >> > > > understand in order to have maxTimestamp be used?  Any further
> >> > > > documentation or guidance on this would be very helpful!
> >> > > >
> >> > > > On another note, I am trying to debug this through the scala/java
> >> > console
> >> > > > consumer, but I'm having a hard time getting IntelliJ setup.  Is
> there
> >> > > > anything special or documentation I need to set this up for
> debugging?
> >> > > >
> >> > >
> >> >
>

Re: Debugging message timestamps in Sarama

Posted by Craig Ching <cr...@gmail.com>.
This didn’t fix my problem unfortunately.  Both time stamps are 0.


> On Jul 24, 2018, at 15:22, Craig Ching <cr...@gmail.com> wrote:
> 
> Hey, thanks for that Dmitriy!  I'll have a look.
> 
>> On Tue, Jul 24, 2018 at 11:18 AM Dmitriy Vsekhvalnov <dv...@gmail.com> wrote:
>> Not really associated with Sarama.
>> 
>> But your issues sounds pretty much same i faced some time ago and fixed,
>> here it is: https://github.com/Shopify/sarama/issues/885
>> 
>> Try using msg.BlockTimestamp instead of msg.Timestamp and see if it helps.
>> 
>> On Tue, Jul 24, 2018 at 3:26 AM Craig Ching <cr...@gmail.com> wrote:
>> 
>> > Hi Dmitry,
>> >
>> > Are you associated with the Sarama project?  If so, understand that part of
>> > what I want is to learn about Sarama and the Kafka message format ;)
>> >
>> > The problem I'm having is that if I turn on:
>> >
>> > log.message.timestamp.type=LogAppendTime
>> >
>> > in the broker, then produce on topic1 with console producer, I will see
>> > timestamps in the sarama client.  If I produce on topic2 with telegraf
>> > (incidentally, I think telegraf is a sarama producer), then I don't see
>> > timestamps in the sarama client.  In both cases, if I consume using the
>> > console consumer (with --property print.timestamp=true) I *do* see
>> > timestamps.
>> >
>> > I'm happy to debug this issue myself and submit a PR to sarama, but I am
>> > missing some fundamentals of how to decode the kafka message format and
>> > would really like some pointers.
>> >
>> > Cheers,
>> > Craig
>> >
>> > P.S.  Here is the sarama code I'm using to test:
>> >
>> > package main
>> >
>> > import (
>> > "fmt"
>> > "log"
>> > "os"
>> > "os/signal"
>> > "time"
>> >
>> > "github.com/Shopify/sarama"
>> > )
>> >
>> > func main() {
>> >
>> > // Initialize Sarama logging
>> > sarama.Logger = log.New(os.Stdout, "[Sarama] ",
>> > log.Ldate|log.Lmicroseconds|log.Lshortfile)
>> >
>> > signals := make(chan os.Signal, 1)
>> > signal.Notify(signals, os.Interrupt)
>> >
>> > config := sarama.NewConfig()
>> > config.Consumer.Return.Errors = true
>> > config.ClientID = "consumer-test"
>> > config.Metadata.RefreshFrequency = time.Duration(5) * time.Minute
>> > config.Metadata.Full = true
>> > // config.Version = sarama.V0_10_0_0
>> > config.Version = sarama.V1_1_0_0
>> > // config.Version = sarama.V0_10_2_1
>> > config.Consumer.Offsets.Initial = sarama.OffsetOldest
>> >
>> > brokers := []string{"localhost:9092"}
>> > // brokers :=
>> >
>> > []string{"measurement-kafka-broker.service.tgt-pe-prod-ttc.consul.c-prod.ost.cloud.target.internal:9092"}
>> >
>> > client, err := sarama.NewConsumer(brokers, config)
>> > if err != nil {
>> > panic(err)
>> > }
>> >
>> > // topic := "topic1"
>> > topic := "topic2"
>> > // topic := "metric-influx-measurement"
>> > // How to decide partition, is it fixed value...?
>> > consumer, err := client.ConsumePartition(topic, 0, sarama.OffsetOldest)
>> > if err != nil {
>> > panic(err)
>> > }
>> >
>> > defer func() {
>> > if err := client.Close(); err != nil {
>> > panic(err)
>> > }
>> > }()
>> >
>> > // Count how many message processed
>> > msgCount := 0
>> >
>> > go func() {
>> > for {
>> > select {
>> > case err := <-consumer.Errors():
>> > fmt.Println(err)
>> > case msg := <-consumer.Messages():
>> > msgCount++
>> > fmt.Println(msg.Timestamp)
>> > fmt.Println("Received messages", string(msg.Key), string(msg.Value))
>> > case <-signals:
>> > fmt.Println("Interrupt is detected")
>> > break
>> > }
>> > }
>> > }()
>> > <-signals
>> > }
>> >
>> >
>> > On Mon, Jul 23, 2018 at 10:43 AM Dmitriy Vsekhvalnov <
>> > dvsekhvalnov@gmail.com>
>> > wrote:
>> >
>> > > Hey Craig,
>> > >
>> > > what exact problem you have with Sarama client?
>> > >
>> > > On Mon, Jul 23, 2018 at 5:11 PM Craig Ching <cr...@gmail.com>
>> > wrote:
>> > >
>> > > > Hi!
>> > > >
>> > > > I'm working on debugging a problem with how message timestamps are
>> > > handled
>> > > > in the sarama client.  In some cases, the sarama client won't
>> > associate a
>> > > > timestamp with a message while the kafka console consumer does.  I've
>> > > found
>> > > > the documentation on the message format here:
>> > > >
>> > > > https://kafka.apache.org/documentation/#messageformat
>> > > >
>> > > > But the information there is very sparse.  For instance, what are
>> > > > 'firstTimestamp' and 'maxTimestamp'?  It seems that when I'm debugging
>> > > > sarama, firstTimestamp is set to -1 and maxTimestamp appears to be the
>> > > > timestamp I want.  Is there some state about the message that I need to
>> > > > understand in order to have maxTimestamp be used?  Any further
>> > > > documentation or guidance on this would be very helpful!
>> > > >
>> > > > On another note, I am trying to debug this through the scala/java
>> > console
>> > > > consumer, but I'm having a hard time getting IntelliJ setup.  Is there
>> > > > anything special or documentation I need to set this up for debugging?
>> > > >
>> > >
>> >

Re: Debugging message timestamps in Sarama

Posted by Craig Ching <cr...@gmail.com>.
Hey, thanks for that Dmitriy!  I'll have a look.

On Tue, Jul 24, 2018 at 11:18 AM Dmitriy Vsekhvalnov <dv...@gmail.com>
wrote:

> Not really associated with Sarama.
>
> But your issues sounds pretty much same i faced some time ago and fixed,
> here it is: https://github.com/Shopify/sarama/issues/885
>
> Try using msg.BlockTimestamp instead of msg.Timestamp and see if it helps.
>
> On Tue, Jul 24, 2018 at 3:26 AM Craig Ching <cr...@gmail.com> wrote:
>
> > Hi Dmitry,
> >
> > Are you associated with the Sarama project?  If so, understand that part
> of
> > what I want is to learn about Sarama and the Kafka message format ;)
> >
> > The problem I'm having is that if I turn on:
> >
> > log.message.timestamp.type=LogAppendTime
> >
> > in the broker, then produce on topic1 with console producer, I will see
> > timestamps in the sarama client.  If I produce on topic2 with telegraf
> > (incidentally, I think telegraf is a sarama producer), then I don't see
> > timestamps in the sarama client.  In both cases, if I consume using the
> > console consumer (with --property print.timestamp=true) I *do* see
> > timestamps.
> >
> > I'm happy to debug this issue myself and submit a PR to sarama, but I am
> > missing some fundamentals of how to decode the kafka message format and
> > would really like some pointers.
> >
> > Cheers,
> > Craig
> >
> > P.S.  Here is the sarama code I'm using to test:
> >
> > package main
> >
> > import (
> > "fmt"
> > "log"
> > "os"
> > "os/signal"
> > "time"
> >
> > "github.com/Shopify/sarama"
> > )
> >
> > func main() {
> >
> > // Initialize Sarama logging
> > sarama.Logger = log.New(os.Stdout, "[Sarama] ",
> > log.Ldate|log.Lmicroseconds|log.Lshortfile)
> >
> > signals := make(chan os.Signal, 1)
> > signal.Notify(signals, os.Interrupt)
> >
> > config := sarama.NewConfig()
> > config.Consumer.Return.Errors = true
> > config.ClientID = "consumer-test"
> > config.Metadata.RefreshFrequency = time.Duration(5) * time.Minute
> > config.Metadata.Full = true
> > // config.Version = sarama.V0_10_0_0
> > config.Version = sarama.V1_1_0_0
> > // config.Version = sarama.V0_10_2_1
> > config.Consumer.Offsets.Initial = sarama.OffsetOldest
> >
> > brokers := []string{"localhost:9092"}
> > // brokers :=
> >
> >
> []string{"measurement-kafka-broker.service.tgt-pe-prod-ttc.consul.c-prod.ost.cloud.target.internal:9092"}
> >
> > client, err := sarama.NewConsumer(brokers, config)
> > if err != nil {
> > panic(err)
> > }
> >
> > // topic := "topic1"
> > topic := "topic2"
> > // topic := "metric-influx-measurement"
> > // How to decide partition, is it fixed value...?
> > consumer, err := client.ConsumePartition(topic, 0, sarama.OffsetOldest)
> > if err != nil {
> > panic(err)
> > }
> >
> > defer func() {
> > if err := client.Close(); err != nil {
> > panic(err)
> > }
> > }()
> >
> > // Count how many message processed
> > msgCount := 0
> >
> > go func() {
> > for {
> > select {
> > case err := <-consumer.Errors():
> > fmt.Println(err)
> > case msg := <-consumer.Messages():
> > msgCount++
> > fmt.Println(msg.Timestamp)
> > fmt.Println("Received messages", string(msg.Key), string(msg.Value))
> > case <-signals:
> > fmt.Println("Interrupt is detected")
> > break
> > }
> > }
> > }()
> > <-signals
> > }
> >
> >
> > On Mon, Jul 23, 2018 at 10:43 AM Dmitriy Vsekhvalnov <
> > dvsekhvalnov@gmail.com>
> > wrote:
> >
> > > Hey Craig,
> > >
> > > what exact problem you have with Sarama client?
> > >
> > > On Mon, Jul 23, 2018 at 5:11 PM Craig Ching <cr...@gmail.com>
> > wrote:
> > >
> > > > Hi!
> > > >
> > > > I'm working on debugging a problem with how message timestamps are
> > > handled
> > > > in the sarama client.  In some cases, the sarama client won't
> > associate a
> > > > timestamp with a message while the kafka console consumer does.  I've
> > > found
> > > > the documentation on the message format here:
> > > >
> > > > https://kafka.apache.org/documentation/#messageformat
> > > >
> > > > But the information there is very sparse.  For instance, what are
> > > > 'firstTimestamp' and 'maxTimestamp'?  It seems that when I'm
> debugging
> > > > sarama, firstTimestamp is set to -1 and maxTimestamp appears to be
> the
> > > > timestamp I want.  Is there some state about the message that I need
> to
> > > > understand in order to have maxTimestamp be used?  Any further
> > > > documentation or guidance on this would be very helpful!
> > > >
> > > > On another note, I am trying to debug this through the scala/java
> > console
> > > > consumer, but I'm having a hard time getting IntelliJ setup.  Is
> there
> > > > anything special or documentation I need to set this up for
> debugging?
> > > >
> > >
> >
>

Re: Debugging message timestamps in Sarama

Posted by Dmitriy Vsekhvalnov <dv...@gmail.com>.
Not really associated with Sarama.

But your issues sounds pretty much same i faced some time ago and fixed,
here it is: https://github.com/Shopify/sarama/issues/885

Try using msg.BlockTimestamp instead of msg.Timestamp and see if it helps.

On Tue, Jul 24, 2018 at 3:26 AM Craig Ching <cr...@gmail.com> wrote:

> Hi Dmitry,
>
> Are you associated with the Sarama project?  If so, understand that part of
> what I want is to learn about Sarama and the Kafka message format ;)
>
> The problem I'm having is that if I turn on:
>
> log.message.timestamp.type=LogAppendTime
>
> in the broker, then produce on topic1 with console producer, I will see
> timestamps in the sarama client.  If I produce on topic2 with telegraf
> (incidentally, I think telegraf is a sarama producer), then I don't see
> timestamps in the sarama client.  In both cases, if I consume using the
> console consumer (with --property print.timestamp=true) I *do* see
> timestamps.
>
> I'm happy to debug this issue myself and submit a PR to sarama, but I am
> missing some fundamentals of how to decode the kafka message format and
> would really like some pointers.
>
> Cheers,
> Craig
>
> P.S.  Here is the sarama code I'm using to test:
>
> package main
>
> import (
> "fmt"
> "log"
> "os"
> "os/signal"
> "time"
>
> "github.com/Shopify/sarama"
> )
>
> func main() {
>
> // Initialize Sarama logging
> sarama.Logger = log.New(os.Stdout, "[Sarama] ",
> log.Ldate|log.Lmicroseconds|log.Lshortfile)
>
> signals := make(chan os.Signal, 1)
> signal.Notify(signals, os.Interrupt)
>
> config := sarama.NewConfig()
> config.Consumer.Return.Errors = true
> config.ClientID = "consumer-test"
> config.Metadata.RefreshFrequency = time.Duration(5) * time.Minute
> config.Metadata.Full = true
> // config.Version = sarama.V0_10_0_0
> config.Version = sarama.V1_1_0_0
> // config.Version = sarama.V0_10_2_1
> config.Consumer.Offsets.Initial = sarama.OffsetOldest
>
> brokers := []string{"localhost:9092"}
> // brokers :=
>
> []string{"measurement-kafka-broker.service.tgt-pe-prod-ttc.consul.c-prod.ost.cloud.target.internal:9092"}
>
> client, err := sarama.NewConsumer(brokers, config)
> if err != nil {
> panic(err)
> }
>
> // topic := "topic1"
> topic := "topic2"
> // topic := "metric-influx-measurement"
> // How to decide partition, is it fixed value...?
> consumer, err := client.ConsumePartition(topic, 0, sarama.OffsetOldest)
> if err != nil {
> panic(err)
> }
>
> defer func() {
> if err := client.Close(); err != nil {
> panic(err)
> }
> }()
>
> // Count how many message processed
> msgCount := 0
>
> go func() {
> for {
> select {
> case err := <-consumer.Errors():
> fmt.Println(err)
> case msg := <-consumer.Messages():
> msgCount++
> fmt.Println(msg.Timestamp)
> fmt.Println("Received messages", string(msg.Key), string(msg.Value))
> case <-signals:
> fmt.Println("Interrupt is detected")
> break
> }
> }
> }()
> <-signals
> }
>
>
> On Mon, Jul 23, 2018 at 10:43 AM Dmitriy Vsekhvalnov <
> dvsekhvalnov@gmail.com>
> wrote:
>
> > Hey Craig,
> >
> > what exact problem you have with Sarama client?
> >
> > On Mon, Jul 23, 2018 at 5:11 PM Craig Ching <cr...@gmail.com>
> wrote:
> >
> > > Hi!
> > >
> > > I'm working on debugging a problem with how message timestamps are
> > handled
> > > in the sarama client.  In some cases, the sarama client won't
> associate a
> > > timestamp with a message while the kafka console consumer does.  I've
> > found
> > > the documentation on the message format here:
> > >
> > > https://kafka.apache.org/documentation/#messageformat
> > >
> > > But the information there is very sparse.  For instance, what are
> > > 'firstTimestamp' and 'maxTimestamp'?  It seems that when I'm debugging
> > > sarama, firstTimestamp is set to -1 and maxTimestamp appears to be the
> > > timestamp I want.  Is there some state about the message that I need to
> > > understand in order to have maxTimestamp be used?  Any further
> > > documentation or guidance on this would be very helpful!
> > >
> > > On another note, I am trying to debug this through the scala/java
> console
> > > consumer, but I'm having a hard time getting IntelliJ setup.  Is there
> > > anything special or documentation I need to set this up for debugging?
> > >
> >
>

Re: Debugging message timestamps in Sarama

Posted by Craig Ching <cr...@gmail.com>.
Hi Dmitry,

Are you associated with the Sarama project?  If so, understand that part of
what I want is to learn about Sarama and the Kafka message format ;)

The problem I'm having is that if I turn on:

log.message.timestamp.type=LogAppendTime

in the broker, then produce on topic1 with console producer, I will see
timestamps in the sarama client.  If I produce on topic2 with telegraf
(incidentally, I think telegraf is a sarama producer), then I don't see
timestamps in the sarama client.  In both cases, if I consume using the
console consumer (with --property print.timestamp=true) I *do* see
timestamps.

I'm happy to debug this issue myself and submit a PR to sarama, but I am
missing some fundamentals of how to decode the kafka message format and
would really like some pointers.

Cheers,
Craig

P.S.  Here is the sarama code I'm using to test:

package main

import (
"fmt"
"log"
"os"
"os/signal"
"time"

"github.com/Shopify/sarama"
)

func main() {

// Initialize Sarama logging
sarama.Logger = log.New(os.Stdout, "[Sarama] ",
log.Ldate|log.Lmicroseconds|log.Lshortfile)

signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.ClientID = "consumer-test"
config.Metadata.RefreshFrequency = time.Duration(5) * time.Minute
config.Metadata.Full = true
// config.Version = sarama.V0_10_0_0
config.Version = sarama.V1_1_0_0
// config.Version = sarama.V0_10_2_1
config.Consumer.Offsets.Initial = sarama.OffsetOldest

brokers := []string{"localhost:9092"}
// brokers :=
[]string{"measurement-kafka-broker.service.tgt-pe-prod-ttc.consul.c-prod.ost.cloud.target.internal:9092"}

client, err := sarama.NewConsumer(brokers, config)
if err != nil {
panic(err)
}

// topic := "topic1"
topic := "topic2"
// topic := "metric-influx-measurement"
// How to decide partition, is it fixed value...?
consumer, err := client.ConsumePartition(topic, 0, sarama.OffsetOldest)
if err != nil {
panic(err)
}

defer func() {
if err := client.Close(); err != nil {
panic(err)
}
}()

// Count how many message processed
msgCount := 0

go func() {
for {
select {
case err := <-consumer.Errors():
fmt.Println(err)
case msg := <-consumer.Messages():
msgCount++
fmt.Println(msg.Timestamp)
fmt.Println("Received messages", string(msg.Key), string(msg.Value))
case <-signals:
fmt.Println("Interrupt is detected")
break
}
}
}()
<-signals
}


On Mon, Jul 23, 2018 at 10:43 AM Dmitriy Vsekhvalnov <dv...@gmail.com>
wrote:

> Hey Craig,
>
> what exact problem you have with Sarama client?
>
> On Mon, Jul 23, 2018 at 5:11 PM Craig Ching <cr...@gmail.com> wrote:
>
> > Hi!
> >
> > I'm working on debugging a problem with how message timestamps are
> handled
> > in the sarama client.  In some cases, the sarama client won't associate a
> > timestamp with a message while the kafka console consumer does.  I've
> found
> > the documentation on the message format here:
> >
> > https://kafka.apache.org/documentation/#messageformat
> >
> > But the information there is very sparse.  For instance, what are
> > 'firstTimestamp' and 'maxTimestamp'?  It seems that when I'm debugging
> > sarama, firstTimestamp is set to -1 and maxTimestamp appears to be the
> > timestamp I want.  Is there some state about the message that I need to
> > understand in order to have maxTimestamp be used?  Any further
> > documentation or guidance on this would be very helpful!
> >
> > On another note, I am trying to debug this through the scala/java console
> > consumer, but I'm having a hard time getting IntelliJ setup.  Is there
> > anything special or documentation I need to set this up for debugging?
> >
>

Re: Debugging message timestamps in Sarama

Posted by Dmitriy Vsekhvalnov <dv...@gmail.com>.
Hey Craig,

what exact problem you have with Sarama client?

On Mon, Jul 23, 2018 at 5:11 PM Craig Ching <cr...@gmail.com> wrote:

> Hi!
>
> I'm working on debugging a problem with how message timestamps are handled
> in the sarama client.  In some cases, the sarama client won't associate a
> timestamp with a message while the kafka console consumer does.  I've found
> the documentation on the message format here:
>
> https://kafka.apache.org/documentation/#messageformat
>
> But the information there is very sparse.  For instance, what are
> 'firstTimestamp' and 'maxTimestamp'?  It seems that when I'm debugging
> sarama, firstTimestamp is set to -1 and maxTimestamp appears to be the
> timestamp I want.  Is there some state about the message that I need to
> understand in order to have maxTimestamp be used?  Any further
> documentation or guidance on this would be very helpful!
>
> On another note, I am trying to debug this through the scala/java console
> consumer, but I'm having a hard time getting IntelliJ setup.  Is there
> anything special or documentation I need to set this up for debugging?
>