You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Li Li <fa...@gmail.com> on 2014/06/23 06:10:23 UTC

high level consumer not working

hi all,
   I am reading the book "apache kafka" and write a simple producer
and consumer class. the producer works but the consumer hangs.
   The producer class:
public static void main(String[] args) {
    String topic="test-topic";
    Properties props = new Properties();
    props.put("metadata.broker.list","linux157:9092");
    props.put("serializer.class","kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    ProducerConfig config = new ProducerConfig(props);
    Producer<Integer, String> producer = new Producer<Integer,String>(config);
    for(int i=0;i<100;i++){
        KeyedMessage<Integer, String> data = new
KeyedMessage<Integer,String>(topic, "msg"+i);
        producer.send(data);
    }
    producer.close();

}

public class TestKafkaConsumer {
private final ConsumerConnector consumer;
private final String topic;

public TestKafkaConsumer(String zookeeper, String groupId, String topic) {
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "500");
props.put("zookeeper.sync.time.ms", "250");
props.put("auto.commit.interval.ms", "1000");
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(
props));
this.topic = topic;
}

public void testConsumer() {
    Map<String, Integer> topicCount = new HashMap<String, Integer>();
    // Define single thread for topic
    topicCount.put(topic, new Integer(1));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer
             .createMessageStreams(topicCount);
    List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
    for (final KafkaStream stream : streams) {
        ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
        while (consumerIte.hasNext())
            System.out.println("Message from Single Topic :: "
                       + new String(consumerIte.next().message()));
        }
    if (consumer != null)
        consumer.shutdown();
}

public static void main(String[] args) {
    String topic = "test-topic";
    TestKafkaConsumer simpleHLConsumer = new
TestKafkaConsumer("linux157:2181",testgroup22", topic);
    simpleHLConsumer.testConsumer();

}

}

Re: high level consumer not working

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Li Li,

If you use the same consumer group id then offsets may have already been
committed to Kafka, hence messages before that will not be consumed.

Guozhang


On Mon, Jun 23, 2014 at 6:09 PM, Li Li <fa...@gmail.com> wrote:

> no luck by adding props.put("auto.offset.reset", "smallest");
> but running consumer after producer works.
> But in my use case, it's not alwasy true for this.
> Another question, The consumer should remember the offset. it's not
> very easy to use.
>
> On Mon, Jun 23, 2014 at 11:05 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> > Did you start the consumer after the producer? The default behavior of
> the
> > consumer is to "consume from the tail of the log", and hence if there is
> no
> > new messages coming in after the consumer started, it will get nothing.
> You
> > may set
> >
> > auto.offset.reset="smallest"
> >
> > and try again.
> >
> > Guozhang
> >
> >
> > On Sun, Jun 22, 2014 at 9:10 PM, Li Li <fa...@gmail.com> wrote:
> >
> >> hi all,
> >>    I am reading the book "apache kafka" and write a simple producer
> >> and consumer class. the producer works but the consumer hangs.
> >>    The producer class:
> >> public static void main(String[] args) {
> >>     String topic="test-topic";
> >>     Properties props = new Properties();
> >>     props.put("metadata.broker.list","linux157:9092");
> >>     props.put("serializer.class","kafka.serializer.StringEncoder");
> >>     props.put("request.required.acks", "1");
> >>     ProducerConfig config = new ProducerConfig(props);
> >>     Producer<Integer, String> producer = new
> >> Producer<Integer,String>(config);
> >>     for(int i=0;i<100;i++){
> >>         KeyedMessage<Integer, String> data = new
> >> KeyedMessage<Integer,String>(topic, "msg"+i);
> >>         producer.send(data);
> >>     }
> >>     producer.close();
> >>
> >> }
> >>
> >> public class TestKafkaConsumer {
> >> private final ConsumerConnector consumer;
> >> private final String topic;
> >>
> >> public TestKafkaConsumer(String zookeeper, String groupId, String
> topic) {
> >> Properties props = new Properties();
> >> props.put("zookeeper.connect", zookeeper);
> >> props.put("group.id", groupId);
> >> props.put("zookeeper.session.timeout.ms", "500");
> >> props.put("zookeeper.sync.time.ms", "250");
> >> props.put("auto.commit.interval.ms", "1000");
> >> consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(
> >> props));
> >> this.topic = topic;
> >> }
> >>
> >> public void testConsumer() {
> >>     Map<String, Integer> topicCount = new HashMap<String, Integer>();
> >>     // Define single thread for topic
> >>     topicCount.put(topic, new Integer(1));
> >>     Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
> >> consumer
> >>              .createMessageStreams(topicCount);
> >>     List<KafkaStream<byte[], byte[]>> streams =
> consumerStreams.get(topic);
> >>     for (final KafkaStream stream : streams) {
> >>         ConsumerIterator<byte[], byte[]> consumerIte =
> stream.iterator();
> >>         while (consumerIte.hasNext())
> >>             System.out.println("Message from Single Topic :: "
> >>                        + new String(consumerIte.next().message()));
> >>         }
> >>     if (consumer != null)
> >>         consumer.shutdown();
> >> }
> >>
> >> public static void main(String[] args) {
> >>     String topic = "test-topic";
> >>     TestKafkaConsumer simpleHLConsumer = new
> >> TestKafkaConsumer("linux157:2181",testgroup22", topic);
> >>     simpleHLConsumer.testConsumer();
> >>
> >> }
> >>
> >> }
> >>
> >
> >
> >
> > --
> > -- Guozhang
>



-- 
-- Guozhang

Re: high level consumer not working

Posted by Li Li <fa...@gmail.com>.
no luck by adding props.put("auto.offset.reset", "smallest");
but running consumer after producer works.
But in my use case, it's not alwasy true for this.
Another question, The consumer should remember the offset. it's not
very easy to use.

On Mon, Jun 23, 2014 at 11:05 PM, Guozhang Wang <wa...@gmail.com> wrote:
> Did you start the consumer after the producer? The default behavior of the
> consumer is to "consume from the tail of the log", and hence if there is no
> new messages coming in after the consumer started, it will get nothing. You
> may set
>
> auto.offset.reset="smallest"
>
> and try again.
>
> Guozhang
>
>
> On Sun, Jun 22, 2014 at 9:10 PM, Li Li <fa...@gmail.com> wrote:
>
>> hi all,
>>    I am reading the book "apache kafka" and write a simple producer
>> and consumer class. the producer works but the consumer hangs.
>>    The producer class:
>> public static void main(String[] args) {
>>     String topic="test-topic";
>>     Properties props = new Properties();
>>     props.put("metadata.broker.list","linux157:9092");
>>     props.put("serializer.class","kafka.serializer.StringEncoder");
>>     props.put("request.required.acks", "1");
>>     ProducerConfig config = new ProducerConfig(props);
>>     Producer<Integer, String> producer = new
>> Producer<Integer,String>(config);
>>     for(int i=0;i<100;i++){
>>         KeyedMessage<Integer, String> data = new
>> KeyedMessage<Integer,String>(topic, "msg"+i);
>>         producer.send(data);
>>     }
>>     producer.close();
>>
>> }
>>
>> public class TestKafkaConsumer {
>> private final ConsumerConnector consumer;
>> private final String topic;
>>
>> public TestKafkaConsumer(String zookeeper, String groupId, String topic) {
>> Properties props = new Properties();
>> props.put("zookeeper.connect", zookeeper);
>> props.put("group.id", groupId);
>> props.put("zookeeper.session.timeout.ms", "500");
>> props.put("zookeeper.sync.time.ms", "250");
>> props.put("auto.commit.interval.ms", "1000");
>> consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(
>> props));
>> this.topic = topic;
>> }
>>
>> public void testConsumer() {
>>     Map<String, Integer> topicCount = new HashMap<String, Integer>();
>>     // Define single thread for topic
>>     topicCount.put(topic, new Integer(1));
>>     Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
>> consumer
>>              .createMessageStreams(topicCount);
>>     List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
>>     for (final KafkaStream stream : streams) {
>>         ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
>>         while (consumerIte.hasNext())
>>             System.out.println("Message from Single Topic :: "
>>                        + new String(consumerIte.next().message()));
>>         }
>>     if (consumer != null)
>>         consumer.shutdown();
>> }
>>
>> public static void main(String[] args) {
>>     String topic = "test-topic";
>>     TestKafkaConsumer simpleHLConsumer = new
>> TestKafkaConsumer("linux157:2181",testgroup22", topic);
>>     simpleHLConsumer.testConsumer();
>>
>> }
>>
>> }
>>
>
>
>
> --
> -- Guozhang

Re: high level consumer not working

Posted by Neha Narkhede <ne...@gmail.com>.
 for (final KafkaStream stream : streams) {
        ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
        while (consumerIte.hasNext())
            System.out.println("Message from Single Topic :: "
                       + new String(consumerIte.next().
message()));
        }

Besides what Guozhang suggested, this code has a bug. Since each of the
streams is blocking, you will have to start each stream in a separate
thread. Please take a look at
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

Thanks,
Neha


On Mon, Jun 23, 2014 at 8:05 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Did you start the consumer after the producer? The default behavior of the
> consumer is to "consume from the tail of the log", and hence if there is no
> new messages coming in after the consumer started, it will get nothing. You
> may set
>
> auto.offset.reset="smallest"
>
> and try again.
>
> Guozhang
>
>
> On Sun, Jun 22, 2014 at 9:10 PM, Li Li <fa...@gmail.com> wrote:
>
> > hi all,
> >    I am reading the book "apache kafka" and write a simple producer
> > and consumer class. the producer works but the consumer hangs.
> >    The producer class:
> > public static void main(String[] args) {
> >     String topic="test-topic";
> >     Properties props = new Properties();
> >     props.put("metadata.broker.list","linux157:9092");
> >     props.put("serializer.class","kafka.serializer.StringEncoder");
> >     props.put("request.required.acks", "1");
> >     ProducerConfig config = new ProducerConfig(props);
> >     Producer<Integer, String> producer = new
> > Producer<Integer,String>(config);
> >     for(int i=0;i<100;i++){
> >         KeyedMessage<Integer, String> data = new
> > KeyedMessage<Integer,String>(topic, "msg"+i);
> >         producer.send(data);
> >     }
> >     producer.close();
> >
> > }
> >
> > public class TestKafkaConsumer {
> > private final ConsumerConnector consumer;
> > private final String topic;
> >
> > public TestKafkaConsumer(String zookeeper, String groupId, String topic)
> {
> > Properties props = new Properties();
> > props.put("zookeeper.connect", zookeeper);
> > props.put("group.id", groupId);
> > props.put("zookeeper.session.timeout.ms", "500");
> > props.put("zookeeper.sync.time.ms", "250");
> > props.put("auto.commit.interval.ms", "1000");
> > consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(
> > props));
> > this.topic = topic;
> > }
> >
> > public void testConsumer() {
> >     Map<String, Integer> topicCount = new HashMap<String, Integer>();
> >     // Define single thread for topic
> >     topicCount.put(topic, new Integer(1));
> >     Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
> > consumer
> >              .createMessageStreams(topicCount);
> >     List<KafkaStream<byte[], byte[]>> streams =
> consumerStreams.get(topic);
> >     for (final KafkaStream stream : streams) {
> >         ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
> >         while (consumerIte.hasNext())
> >             System.out.println("Message from Single Topic :: "
> >                        + new String(consumerIte.next().message()));
> >         }
> >     if (consumer != null)
> >         consumer.shutdown();
> > }
> >
> > public static void main(String[] args) {
> >     String topic = "test-topic";
> >     TestKafkaConsumer simpleHLConsumer = new
> > TestKafkaConsumer("linux157:2181",testgroup22", topic);
> >     simpleHLConsumer.testConsumer();
> >
> > }
> >
> > }
> >
>
>
>
> --
> -- Guozhang
>

Re: high level consumer not working

Posted by Guozhang Wang <wa...@gmail.com>.
Did you start the consumer after the producer? The default behavior of the
consumer is to "consume from the tail of the log", and hence if there is no
new messages coming in after the consumer started, it will get nothing. You
may set

auto.offset.reset="smallest"

and try again.

Guozhang


On Sun, Jun 22, 2014 at 9:10 PM, Li Li <fa...@gmail.com> wrote:

> hi all,
>    I am reading the book "apache kafka" and write a simple producer
> and consumer class. the producer works but the consumer hangs.
>    The producer class:
> public static void main(String[] args) {
>     String topic="test-topic";
>     Properties props = new Properties();
>     props.put("metadata.broker.list","linux157:9092");
>     props.put("serializer.class","kafka.serializer.StringEncoder");
>     props.put("request.required.acks", "1");
>     ProducerConfig config = new ProducerConfig(props);
>     Producer<Integer, String> producer = new
> Producer<Integer,String>(config);
>     for(int i=0;i<100;i++){
>         KeyedMessage<Integer, String> data = new
> KeyedMessage<Integer,String>(topic, "msg"+i);
>         producer.send(data);
>     }
>     producer.close();
>
> }
>
> public class TestKafkaConsumer {
> private final ConsumerConnector consumer;
> private final String topic;
>
> public TestKafkaConsumer(String zookeeper, String groupId, String topic) {
> Properties props = new Properties();
> props.put("zookeeper.connect", zookeeper);
> props.put("group.id", groupId);
> props.put("zookeeper.session.timeout.ms", "500");
> props.put("zookeeper.sync.time.ms", "250");
> props.put("auto.commit.interval.ms", "1000");
> consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(
> props));
> this.topic = topic;
> }
>
> public void testConsumer() {
>     Map<String, Integer> topicCount = new HashMap<String, Integer>();
>     // Define single thread for topic
>     topicCount.put(topic, new Integer(1));
>     Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
> consumer
>              .createMessageStreams(topicCount);
>     List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
>     for (final KafkaStream stream : streams) {
>         ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
>         while (consumerIte.hasNext())
>             System.out.println("Message from Single Topic :: "
>                        + new String(consumerIte.next().message()));
>         }
>     if (consumer != null)
>         consumer.shutdown();
> }
>
> public static void main(String[] args) {
>     String topic = "test-topic";
>     TestKafkaConsumer simpleHLConsumer = new
> TestKafkaConsumer("linux157:2181",testgroup22", topic);
>     simpleHLConsumer.testConsumer();
>
> }
>
> }
>



-- 
-- Guozhang