You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Shaolu Xu <sh...@tibco-support.com> on 2016/05/25 03:24:16 UTC

Kafka server contains messages but consumer cannot receive any(Producer used compression.type = snappy)

Hi dev,

Kafka version: 0.9.0  language:  Java

When using kafka, I can set a codec by setting the* compression.type=snappy
*property of my kafka producer.

Suppose I use snappy compression in my producer, and i can see the message
use kafkaMonitor. But when I consuming the messages from kafka using
consumer, I cannot receive any messages.
So should I do something to *decode the data from snappy or set some
configuration when create consumer*?


Thanks in advance.


Thanks,
Nicole

Re: Kafka server contains messages but consumer cannot receive any(Producer used compression.type = snappy)

Posted by Shaolu Xu <sh...@tibco-support.com>.
Hi Jason,

I'm so sorry to reply your email later.
Your solution solved my issue. When I use "earliest" for auto.offset.reset,
the consumer can receiver messages.


Thanks a lot,
Nicole

On Thu, May 26, 2016 at 11:21 AM, Jason Gustafson <ja...@confluent.io>
wrote:

> Hi Nicole,
>
> It would help to see some log files. I cannot view the image you've tried
> to include above. I took a quick glance at your code and noticed that your
> producer only writes 50 messages before it stops. Since you have not
> overridden auto.offset.reset in the consumer config, the default behavior
> will be to start the consumer at the latest offset. My guess is that the
> producer might be finishing all of its writes before the consumer sets its
> initial offset. In that case, the consumer will start at offset 50 and
> won't receive anything because nothing further is written. Try using
> "earliest" for auto.offset.reset and see if that works. If not, attach some
> logs and we'll try to help.
>
> Thanks,
> Jason
>
> On Wed, May 25, 2016 at 6:37 PM, Shaolu Xu <sh...@tibco-support.com>
> wrote:
>
>> Added the project.
>>
>> On Thu, May 26, 2016 at 9:25 AM, Shaolu Xu <sh...@tibco-support.com>
>> wrote:
>>
>>> Hi Tom,
>>>
>>> The following is my producer and consumer configuration:
>>>
>>>    - Producer:
>>>
>>>                    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "
>>> 127.0.0.1:9090");
>>>                    props.put("key.serializer",
>>> "org.apache.kafka.common.serialization.IntegerSerializer");
>>>                    props.put("value.serializer",
>>> "org.apache.kafka.common.serialization.StringSerializer");
>>>                    props.put("batch.size",16384)
>>>                    props.put("linger.ms",10);
>>>                    props.put("compression.type","snappy");
>>>                    KafkaProducer<Integer, String> producer = new
>>> KafkaProducer<Integer, String>(props);
>>>
>>>    - Consumer:
>>>
>>>                    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "
>>> 127.0.0.1:9090");
>>>                    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
>>>                    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
>>> "true");
>>>
>>>  props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
>>>                    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
>>> "30000");
>>>
>>>  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
>>> "org.apache.kafka.common.serialization.IntegerDeserializer");
>>>
>>>  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
>>> "org.apache.kafka.common.serialization.StringDeserializer");
>>>                    KafkaConsumer<Integer, String> consumer = new
>>> KafkaConsumer<>(props);
>>>
>>> This is the consumer debug snapshot:
>>>
>>> [image: Inline image 1]
>>> And the node ip is correct.
>>> Also attachment my project. Thanks in advance.
>>>
>>> Thanks,
>>> Nicole
>>>
>>> On Wed, May 25, 2016 at 6:24 PM, Tom Crayford <tc...@heroku.com>
>>> wrote:
>>>
>>>> Which consumer are you using? Can you see it connecting to the broker in
>>>> the broker logs? I'd recommend putting your configs for producer,
>>>> consumer
>>>> and broker in a reply to assist debugging. Also please attach any
>>>> relevant
>>>> code or log files.
>>>>
>>>> Thanks
>>>>
>>>> Tom Crayford
>>>> Heroku Kafka
>>>>
>>>> On Wednesday, 25 May 2016, Shaolu Xu <sh...@tibco-support.com> wrote:
>>>>
>>>> > Hi All,
>>>> >
>>>> > Anyone have idea about this, Please help me find the issue.
>>>> >
>>>> > Thanks,
>>>> > Nicole
>>>> >
>>>> > On Wed, May 25, 2016 at 11:24 AM, Shaolu Xu <shaxu@tibco-support.com
>>>> > <javascript:;>> wrote:
>>>> >
>>>> > > Hi dev,
>>>> > >
>>>> > > Kafka version: 0.9.0  language:  Java
>>>> > >
>>>> > > When using kafka, I can set a codec by setting the*
>>>> > > compression.type=snappy *property of my kafka producer.
>>>> > >
>>>> > > Suppose I use snappy compression in my producer, and i can see the
>>>> > message
>>>> > > use kafkaMonitor. But when I consuming the messages from kafka using
>>>> > > consumer, I cannot receive any messages.
>>>> > > So should I do something to *decode the data from snappy or set some
>>>> > > configuration when create consumer*?
>>>> > >
>>>> > >
>>>> > > Thanks in advance.
>>>> > >
>>>> > >
>>>> > > Thanks,
>>>> > > Nicole
>>>> > >
>>>> >
>>>>
>>>
>>>
>>
>

Re: Kafka server contains messages but consumer cannot receive any(Producer used compression.type = snappy)

Posted by Jason Gustafson <ja...@confluent.io>.
Hi Nicole,

It would help to see some log files. I cannot view the image you've tried
to include above. I took a quick glance at your code and noticed that your
producer only writes 50 messages before it stops. Since you have not
overridden auto.offset.reset in the consumer config, the default behavior
will be to start the consumer at the latest offset. My guess is that the
producer might be finishing all of its writes before the consumer sets its
initial offset. In that case, the consumer will start at offset 50 and
won't receive anything because nothing further is written. Try using
"earliest" for auto.offset.reset and see if that works. If not, attach some
logs and we'll try to help.

Thanks,
Jason

On Wed, May 25, 2016 at 6:37 PM, Shaolu Xu <sh...@tibco-support.com> wrote:

> Added the project.
>
> On Thu, May 26, 2016 at 9:25 AM, Shaolu Xu <sh...@tibco-support.com>
> wrote:
>
>> Hi Tom,
>>
>> The following is my producer and consumer configuration:
>>
>>    - Producer:
>>
>>                    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "
>> 127.0.0.1:9090");
>>                    props.put("key.serializer",
>> "org.apache.kafka.common.serialization.IntegerSerializer");
>>                    props.put("value.serializer",
>> "org.apache.kafka.common.serialization.StringSerializer");
>>                    props.put("batch.size",16384)
>>                    props.put("linger.ms",10);
>>                    props.put("compression.type","snappy");
>>                    KafkaProducer<Integer, String> producer = new
>> KafkaProducer<Integer, String>(props);
>>
>>    - Consumer:
>>
>>                    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "
>> 127.0.0.1:9090");
>>                    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
>>                    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
>> "true");
>>
>>  props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
>>                    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
>> "30000");
>>
>>  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
>> "org.apache.kafka.common.serialization.IntegerDeserializer");
>>
>>  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
>> "org.apache.kafka.common.serialization.StringDeserializer");
>>                    KafkaConsumer<Integer, String> consumer = new
>> KafkaConsumer<>(props);
>>
>> This is the consumer debug snapshot:
>>
>> [image: Inline image 1]
>> And the node ip is correct.
>> Also attachment my project. Thanks in advance.
>>
>> Thanks,
>> Nicole
>>
>> On Wed, May 25, 2016 at 6:24 PM, Tom Crayford <tc...@heroku.com>
>> wrote:
>>
>>> Which consumer are you using? Can you see it connecting to the broker in
>>> the broker logs? I'd recommend putting your configs for producer,
>>> consumer
>>> and broker in a reply to assist debugging. Also please attach any
>>> relevant
>>> code or log files.
>>>
>>> Thanks
>>>
>>> Tom Crayford
>>> Heroku Kafka
>>>
>>> On Wednesday, 25 May 2016, Shaolu Xu <sh...@tibco-support.com> wrote:
>>>
>>> > Hi All,
>>> >
>>> > Anyone have idea about this, Please help me find the issue.
>>> >
>>> > Thanks,
>>> > Nicole
>>> >
>>> > On Wed, May 25, 2016 at 11:24 AM, Shaolu Xu <shaxu@tibco-support.com
>>> > <javascript:;>> wrote:
>>> >
>>> > > Hi dev,
>>> > >
>>> > > Kafka version: 0.9.0  language:  Java
>>> > >
>>> > > When using kafka, I can set a codec by setting the*
>>> > > compression.type=snappy *property of my kafka producer.
>>> > >
>>> > > Suppose I use snappy compression in my producer, and i can see the
>>> > message
>>> > > use kafkaMonitor. But when I consuming the messages from kafka using
>>> > > consumer, I cannot receive any messages.
>>> > > So should I do something to *decode the data from snappy or set some
>>> > > configuration when create consumer*?
>>> > >
>>> > >
>>> > > Thanks in advance.
>>> > >
>>> > >
>>> > > Thanks,
>>> > > Nicole
>>> > >
>>> >
>>>
>>
>>
>

Re: Kafka server contains messages but consumer cannot receive any(Producer used compression.type = snappy)

Posted by Shaolu Xu <sh...@tibco-support.com>.
Added the project.

On Thu, May 26, 2016 at 9:25 AM, Shaolu Xu <sh...@tibco-support.com> wrote:

> Hi Tom,
>
> The following is my producer and consumer configuration:
>
>    - Producer:
>
>                    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "
> 127.0.0.1:9090");
>                    props.put("key.serializer",
> "org.apache.kafka.common.serialization.IntegerSerializer");
>                    props.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>                    props.put("batch.size",16384)
>                    props.put("linger.ms",10);
>                    props.put("compression.type","snappy");
>                    KafkaProducer<Integer, String> producer = new
> KafkaProducer<Integer, String>(props);
>
>    - Consumer:
>
>                    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "
> 127.0.0.1:9090");
>                    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
>                    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> "true");
>
>  props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
>                    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
> "30000");
>                    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.IntegerDeserializer");
>
>  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.StringDeserializer");
>                    KafkaConsumer<Integer, String> consumer = new
> KafkaConsumer<>(props);
>
> This is the consumer debug snapshot:
>
> [image: Inline image 1]
> And the node ip is correct.
> Also attachment my project. Thanks in advance.
>
> Thanks,
> Nicole
>
> On Wed, May 25, 2016 at 6:24 PM, Tom Crayford <tc...@heroku.com>
> wrote:
>
>> Which consumer are you using? Can you see it connecting to the broker in
>> the broker logs? I'd recommend putting your configs for producer, consumer
>> and broker in a reply to assist debugging. Also please attach any relevant
>> code or log files.
>>
>> Thanks
>>
>> Tom Crayford
>> Heroku Kafka
>>
>> On Wednesday, 25 May 2016, Shaolu Xu <sh...@tibco-support.com> wrote:
>>
>> > Hi All,
>> >
>> > Anyone have idea about this, Please help me find the issue.
>> >
>> > Thanks,
>> > Nicole
>> >
>> > On Wed, May 25, 2016 at 11:24 AM, Shaolu Xu <shaxu@tibco-support.com
>> > <javascript:;>> wrote:
>> >
>> > > Hi dev,
>> > >
>> > > Kafka version: 0.9.0  language:  Java
>> > >
>> > > When using kafka, I can set a codec by setting the*
>> > > compression.type=snappy *property of my kafka producer.
>> > >
>> > > Suppose I use snappy compression in my producer, and i can see the
>> > message
>> > > use kafkaMonitor. But when I consuming the messages from kafka using
>> > > consumer, I cannot receive any messages.
>> > > So should I do something to *decode the data from snappy or set some
>> > > configuration when create consumer*?
>> > >
>> > >
>> > > Thanks in advance.
>> > >
>> > >
>> > > Thanks,
>> > > Nicole
>> > >
>> >
>>
>
>

Re: Kafka server contains messages but consumer cannot receive any(Producer used compression.type = snappy)

Posted by Shaolu Xu <sh...@tibco-support.com>.
Hi Tom,

The following is my producer and consumer configuration:

   - Producer:

                   props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "
127.0.0.1:9090");
                   props.put("key.serializer",
"org.apache.kafka.common.serialization.IntegerSerializer");
                   props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
                   props.put("batch.size",16384)
                   props.put("linger.ms",10);
                   props.put("compression.type","snappy");
                   KafkaProducer<Integer, String> producer = new
KafkaProducer<Integer, String>(props);

   - Consumer:

                   props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "
127.0.0.1:9090");
                   props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
                   props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"true");
                   props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
"1000");
                   props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
"30000");
                   props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerDeserializer");

 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
                   KafkaConsumer<Integer, String> consumer = new
KafkaConsumer<>(props);

This is the consumer debug snapshot:

[image: Inline image 1]
And the node ip is correct.
Also attachment my project. Thanks in advance.

Thanks,
Nicole

On Wed, May 25, 2016 at 6:24 PM, Tom Crayford <tc...@heroku.com> wrote:

> Which consumer are you using? Can you see it connecting to the broker in
> the broker logs? I'd recommend putting your configs for producer, consumer
> and broker in a reply to assist debugging. Also please attach any relevant
> code or log files.
>
> Thanks
>
> Tom Crayford
> Heroku Kafka
>
> On Wednesday, 25 May 2016, Shaolu Xu <sh...@tibco-support.com> wrote:
>
> > Hi All,
> >
> > Anyone have idea about this, Please help me find the issue.
> >
> > Thanks,
> > Nicole
> >
> > On Wed, May 25, 2016 at 11:24 AM, Shaolu Xu <shaxu@tibco-support.com
> > <javascript:;>> wrote:
> >
> > > Hi dev,
> > >
> > > Kafka version: 0.9.0  language:  Java
> > >
> > > When using kafka, I can set a codec by setting the*
> > > compression.type=snappy *property of my kafka producer.
> > >
> > > Suppose I use snappy compression in my producer, and i can see the
> > message
> > > use kafkaMonitor. But when I consuming the messages from kafka using
> > > consumer, I cannot receive any messages.
> > > So should I do something to *decode the data from snappy or set some
> > > configuration when create consumer*?
> > >
> > >
> > > Thanks in advance.
> > >
> > >
> > > Thanks,
> > > Nicole
> > >
> >
>

Re: Kafka server contains messages but consumer cannot receive any(Producer used compression.type = snappy)

Posted by Tom Crayford <tc...@heroku.com>.
Which consumer are you using? Can you see it connecting to the broker in
the broker logs? I'd recommend putting your configs for producer, consumer
and broker in a reply to assist debugging. Also please attach any relevant
code or log files.

Thanks

Tom Crayford
Heroku Kafka

On Wednesday, 25 May 2016, Shaolu Xu <sh...@tibco-support.com> wrote:

> Hi All,
>
> Anyone have idea about this, Please help me find the issue.
>
> Thanks,
> Nicole
>
> On Wed, May 25, 2016 at 11:24 AM, Shaolu Xu <shaxu@tibco-support.com
> <javascript:;>> wrote:
>
> > Hi dev,
> >
> > Kafka version: 0.9.0  language:  Java
> >
> > When using kafka, I can set a codec by setting the*
> > compression.type=snappy *property of my kafka producer.
> >
> > Suppose I use snappy compression in my producer, and i can see the
> message
> > use kafkaMonitor. But when I consuming the messages from kafka using
> > consumer, I cannot receive any messages.
> > So should I do something to *decode the data from snappy or set some
> > configuration when create consumer*?
> >
> >
> > Thanks in advance.
> >
> >
> > Thanks,
> > Nicole
> >
>

Re: Kafka server contains messages but consumer cannot receive any(Producer used compression.type = snappy)

Posted by Shaolu Xu <sh...@tibco-support.com>.
Hi All,

Anyone have idea about this, Please help me find the issue.

Thanks,
Nicole

On Wed, May 25, 2016 at 11:24 AM, Shaolu Xu <sh...@tibco-support.com> wrote:

> Hi dev,
>
> Kafka version: 0.9.0  language:  Java
>
> When using kafka, I can set a codec by setting the*
> compression.type=snappy *property of my kafka producer.
>
> Suppose I use snappy compression in my producer, and i can see the message
> use kafkaMonitor. But when I consuming the messages from kafka using
> consumer, I cannot receive any messages.
> So should I do something to *decode the data from snappy or set some
> configuration when create consumer*?
>
>
> Thanks in advance.
>
>
> Thanks,
> Nicole
>